This is an automated email from the ASF dual-hosted git repository. dsmiley pushed a commit to branch branch_10x in repository https://gitbox.apache.org/repos/asf/solr.git
commit ed08c58ae8585f7dbea5db6c0187ab903278b516 Author: David Smiley <[email protected]> AuthorDate: Fri Nov 28 23:40:04 2025 -0500 SOLR-18005: ConcurrentUpdateJettySolrClient (#3885) Renamed ConcurrentUpdateHttp2SolrClient to ConcurrentUpdateBaseSolrClient, made it abstract, and made a new ConcurrentUpdateJettySolrClient as the one implementation. Moved logic from Http2SolrClient that was only related to this logic, over here. * javadoc record: MissingDoclet didn't consider Java record --- .../apache/lucene/missingdoclet/MissingDoclet.java | 1 + .../SOLR-18005-ConcurrentUpdateJettySolrClient.yml | 8 + ...Client.yml => SOLR-18005-LBAsyncSolrClient.yml} | 4 +- .../org/apache/solr/update/SolrCmdDistributor.java | 4 +- .../apache/solr/update/StreamingSolrClients.java | 15 +- .../solr/core/TestHttpSolrClientProvider.java | 4 +- .../modules/deployment-guide/pages/solrj.adoc | 4 +- ...nt.java => ConcurrentUpdateBaseSolrClient.java} | 111 +++++------- .../impl/ConcurrentUpdateJettySolrClient.java | 198 +++++++++++++++++++++ .../solr/client/solrj/impl/Http2SolrClient.java | 124 +------------ .../solr/client/solrj/impl/StallDetection.java | 6 +- .../SolrExampleStreamingBinaryHttp2Test.java | 3 +- .../embedded/SolrExampleStreamingHttp2Test.java | 20 ++- ...ncurrentUpdateJettySolrClientBadInputTest.java} | 11 +- ...tUpdateJettySolrClientMultiCollectionTest.java} | 12 +- ...va => ConcurrentUpdateJettySolrClientTest.java} | 26 +-- .../client/solrj/impl/Http2SolrClientTest.java | 8 +- .../solrj/apache/ConcurrentUpdateSolrClient.java | 4 +- 18 files changed, 324 insertions(+), 239 deletions(-) diff --git a/build-tools/missing-doclet/src/main/java/org/apache/lucene/missingdoclet/MissingDoclet.java b/build-tools/missing-doclet/src/main/java/org/apache/lucene/missingdoclet/MissingDoclet.java index bb0b0fb3ea0..14487731f6f 100644 --- a/build-tools/missing-doclet/src/main/java/org/apache/lucene/missingdoclet/MissingDoclet.java +++ b/build-tools/missing-doclet/src/main/java/org/apache/lucene/missingdoclet/MissingDoclet.java @@ -237,6 +237,7 @@ public class MissingDoclet extends StandardDoclet { case INTERFACE: case ENUM: case ANNOTATION_TYPE: + case RECORD: if (level(element) >= CLASS) { checkComment(element); for (var subElement : element.getEnclosedElements()) { diff --git a/changelog/unreleased/SOLR-18005-ConcurrentUpdateJettySolrClient.yml b/changelog/unreleased/SOLR-18005-ConcurrentUpdateJettySolrClient.yml new file mode 100644 index 00000000000..905a259630f --- /dev/null +++ b/changelog/unreleased/SOLR-18005-ConcurrentUpdateJettySolrClient.yml @@ -0,0 +1,8 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Renamed ConcurrentUpdateHttp2SolrClient to ConcurrentUpdateBaseSolrClient, made abstract, with a new ConcurrentUpdateJettySolrClient as it's only implementation. +type: other # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: David Smiley +links: + - name: SOLR-18005 + url: https://issues.apache.org/jira/browse/SOLR-18005 diff --git a/changelog/unreleased/SOLR-17161-LBAsyncSolrClient.yml b/changelog/unreleased/SOLR-18005-LBAsyncSolrClient.yml similarity index 80% rename from changelog/unreleased/SOLR-17161-LBAsyncSolrClient.yml rename to changelog/unreleased/SOLR-18005-LBAsyncSolrClient.yml index f722b2ffb46..03d328d7128 100644 --- a/changelog/unreleased/SOLR-17161-LBAsyncSolrClient.yml +++ b/changelog/unreleased/SOLR-18005-LBAsyncSolrClient.yml @@ -4,5 +4,5 @@ type: other # added, changed, fixed, deprecated, removed, dependency_update, sec authors: - name: David Smiley links: - - name: SOLR-17161 - url: https://issues.apache.org/jira/browse/SOLR-17161 + - name: SOLR-18005 + url: https://issues.apache.org/jira/browse/SOLR-18005 diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 5046d9ed876..ec4e7c1f14c 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient; import org.apache.solr.client.solrj.impl.JavaBinResponseParser; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -482,7 +482,7 @@ public class SolrCmdDistributor implements Closeable { /** * NOTE: This is the request that happened to be executed when this error was <b>triggered</b> * the error, but because of how {@link StreamingSolrClients} uses {@link - * ConcurrentUpdateHttp2SolrClient} it might not actaully be the request that <b>caused</b> the + * ConcurrentUpdateBaseSolrClient} it might not actaully be the request that <b>caused</b> the * error -- multiple requests are merged & processed as a sequential batch. */ public Req req; diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java index 5790fcf9aea..35bfc79dc63 100644 --- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java +++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java @@ -27,7 +27,8 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateJettySolrClient; import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.common.SolrException; import org.apache.solr.common.util.StrUtils; @@ -46,7 +47,7 @@ public class StreamingSolrClients { private Http2SolrClient httpClient; - private Map<String, ConcurrentUpdateHttp2SolrClient> solrClients = new HashMap<>(); + private Map<String, ConcurrentUpdateBaseSolrClient> solrClients = new HashMap<>(); private List<SolrError> errors = Collections.synchronizedList(new ArrayList<>()); private ExecutorService updateExecutor; @@ -66,7 +67,7 @@ public class StreamingSolrClients { public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) { String url = getFullUrl(req.node.getUrl()); - ConcurrentUpdateHttp2SolrClient client = solrClients.get(url); + ConcurrentUpdateBaseSolrClient client = solrClients.get(url); if (client == null) { // NOTE: increasing to more than 1 threadCount for the client could cause updates to be // reordered on a greater scale since the current behavior is to only increase the number of @@ -92,13 +93,13 @@ public class StreamingSolrClients { } public synchronized void blockUntilFinished() throws IOException { - for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) { + for (ConcurrentUpdateBaseSolrClient client : solrClients.values()) { client.blockUntilFinished(); } } public synchronized void shutdown() { - for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) { + for (ConcurrentUpdateBaseSolrClient client : solrClients.values()) { client.close(); } } @@ -122,7 +123,7 @@ public class StreamingSolrClients { } } -class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient { +class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateJettySolrClient { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final SolrCmdDistributor.Req req; private final List<SolrError> errors; @@ -154,7 +155,7 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2Solr req.trackRequestResult(resp, respBody, true); } - static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder { + static class Builder extends ConcurrentUpdateJettySolrClient.Builder { protected SolrCmdDistributor.Req req; protected List<SolrError> errors; diff --git a/solr/core/src/test/org/apache/solr/core/TestHttpSolrClientProvider.java b/solr/core/src/test/org/apache/solr/core/TestHttpSolrClientProvider.java index 29a1844869f..42e98a0c282 100644 --- a/solr/core/src/test/org/apache/solr/core/TestHttpSolrClientProvider.java +++ b/solr/core/src/test/org/apache/solr/core/TestHttpSolrClientProvider.java @@ -48,7 +48,7 @@ public class TestHttpSolrClientProvider extends SolrTestCase { public void test_when_updateShardHandler_cfg_is_null() { try (var httpSolrClientProvider = new HttpSolrClientProvider(null, parentSolrMetricCtx); ) { assertEquals( - httpSolrClientProvider.getSolrClient().getIdleTimeout(), + httpSolrClientProvider.getSolrClient().getIdleTimeoutMillis(), SolrHttpConstants.DEFAULT_SO_TIMEOUT); } } @@ -59,7 +59,7 @@ public class TestHttpSolrClientProvider extends SolrTestCase { assertNotEquals(idleTimeout, UpdateShardHandlerConfig.DEFAULT.getDistributedSocketTimeout()); UpdateShardHandlerConfig cfg = new UpdateShardHandlerConfig(-1, -1, idleTimeout, -1, null, -1); try (var httpSolrClientProvider = new HttpSolrClientProvider(cfg, parentSolrMetricCtx); ) { - assertEquals(httpSolrClientProvider.getSolrClient().getIdleTimeout(), idleTimeout); + assertEquals(httpSolrClientProvider.getSolrClient().getIdleTimeoutMillis(), idleTimeout); } } } diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc index 5582621ec7b..6df63275c39 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc @@ -99,7 +99,7 @@ Requests are sent in the form of {solr-javadocs}/solrj/org/apache/solr/client/so - {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.html[`HttpJdkSolrClient`] - a general purpose client based on JDK HttpClient. Supports HTTP/2 and HTTP/1.1, async, non-blocking. Has no dependencies. - {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/LBSolrClient.html[`LBSolrClient`] - an internal client that delegates to other clients pointed at different URLs for fail-over/availability. Adjusts the list of "in-service" nodes based on node health. - {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/CloudSolrClient.html[`CloudSolrClient`] - the ideal client for SolrCloud. Using the "cluster state", it routes requests to the optimal nodes, including splitting out the documents in an UpdateRequest to different nodes. -- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.html[`ConcurrentUpdateHttp2SolrClient`] - geared towards indexing-centric workloads. +- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClient.html[`ConcurrentUpdateJettySolrClient`] - geared towards indexing-centric workloads. Buffers documents internally before sending larger batches to Solr. === Common Configuration Options @@ -147,7 +147,7 @@ include::example$UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrclient-timeou When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on. -`ConcurrentUpdateSolrClient` and its counterpart `ConcurrentUpdateHttp2SolrClient` also implement a stall prevention +`ConcurrentUpdateSolrClient` and its counterpart `ConcurrentUpdateBaseSolrClient` also implement a stall prevention timeout that allows requests to non-responsive nodes to fail quicker than waiting for a socket timeout. The default value of this timeout is set to 15000 ms and can be adjusted by a system property `solr.cloud.client.stallTime`. This value should be smaller than `solr.jetty.http.idleTimeout` (Which is 120000 ms by default) and greater than the diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java similarity index 88% rename from solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java rename to solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java index 0b9684f6b5b..745bc1ffd2d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java @@ -38,6 +38,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrNamedThreadFactory; import org.eclipse.jetty.client.InputStreamResponseListener; @@ -47,29 +48,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -/** A Solr client using {@link Http2SolrClient} to send concurrent updates to Solr. */ -public class ConcurrentUpdateHttp2SolrClient extends SolrClient { +/** A ConcurrentUpdate {@link SolrClient} -- it sends updates concurrently and asynchronously. */ +public abstract class ConcurrentUpdateBaseSolrClient extends SolrClient { + // formerly known as ConcurrentUpdateBaseSolrClient private static final long serialVersionUID = 1L; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Update END_UPDATE = new Update(null, null); - private Http2SolrClient client; - private final String basePath; - private final CustomBlockingQueue<Update> queue; + private HttpSolrClientBase client; + protected final String basePath; + protected final CustomBlockingQueue<Update> queue; private final ExecutorService scheduler; private final Queue<Runner> runners; private final int threadCount; + private final long idleTimeoutMillis; private boolean shutdownClient; private boolean shutdownExecutor; - private long pollQueueTimeMillis; + protected long pollQueueTimeMillis; private final boolean streamDeletes; private volatile boolean closed; private volatile CountDownLatch lock = null; // used to block everything protected StallDetection stallDetection; - private static class CustomBlockingQueue<E> implements Iterable<E> { + protected static class CustomBlockingQueue<E> implements Iterable<E> { private final BlockingQueue<E> queue; private final Semaphore available; private final int queueSize; @@ -140,9 +143,15 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } } - protected ConcurrentUpdateHttp2SolrClient(Builder builder) { + protected ConcurrentUpdateBaseSolrClient(Builder builder) { + if (builder.baseSolrUrl == null) { + throw new IllegalArgumentException( + "Cannot create HttpSolrClient without a valid baseSolrUrl!"); + } + this.client = builder.client; - this.shutdownClient = builder.closeHttp2Client; + this.shutdownClient = builder.closeHttpClient; + this.idleTimeoutMillis = builder.idleTimeoutMillis; this.threadCount = builder.threadCount; this.queue = new CustomBlockingQueue<>(builder.queueSize, threadCount, END_UPDATE); this.runners = new ArrayDeque<>(); @@ -177,7 +186,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } /** Class representing an UpdateRequest and an optional collection. */ - private record Update(UpdateRequest request, String collection) {} + protected record Update(UpdateRequest request, String collection) {} /** Opens a connection and sends everything... */ class Runner implements Runnable { @@ -238,28 +247,10 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } InputStreamResponseListener responseListener = null; - try (Http2SolrClient.OutStream out = - client.initOutStream(basePath, update.request(), update.collection())) { - Update upd = update; - while (upd != null) { - UpdateRequest req = upd.request(); - if (!out.belongToThisStream(req, upd.collection())) { - // Request has different params or destination core/collection, return to queue - queue.add(upd); - break; - } - client.send(out, upd.request(), upd.collection()); - out.flush(); - - notifyQueueAndRunnersIfEmptyQueue(); - upd = queue.poll(pollQueueTimeMillis, TimeUnit.MILLISECONDS); - } - responseListener = out.getResponseListener(); - } + responseListener = doSendUpdateStream(update); // just wait for the headers, so the idle timeout is sensible - Response response = - responseListener.get(client.getIdleTimeout(), TimeUnit.MILLISECONDS); + Response response = responseListener.get(idleTimeoutMillis, TimeUnit.MILLISECONDS); rspBody = responseListener.getInputStream(); int statusCode = response.getStatus(); @@ -288,8 +279,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { // don't want to fail to report error if parsing the response fails log.warn("Failed to parse error response from {} due to: ", basePath, exc); } finally { - solrExc = - new SolrClient.RemoteSolrException(basePath, statusCode, msg.toString(), null); + solrExc = new RemoteSolrException(basePath, statusCode, msg.toString(), null); if (metadata != null) { solrExc.setMetadata(metadata); } @@ -316,6 +306,9 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } } + protected abstract InputStreamResponseListener doSendUpdateStream(Update update) + throws IOException, InterruptedException; + private void consumeFully(InputStream is) { if (is != null) { try (is) { @@ -330,7 +323,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } } - private void notifyQueueAndRunnersIfEmptyQueue() { + protected void notifyQueueAndRunnersIfEmptyQueue() { if (queue.size() == 0) { synchronized (queue) { // queue may be empty @@ -348,7 +341,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() } private void addRunner() { MDC.put( - "ConcurrentUpdateHttp2SolrClient.url", + "ConcurrentUpdateBaseSolrClient.url", String.valueOf(client.getBaseURL())); // MDC can't have null value try { Runner r = new Runner(); @@ -362,7 +355,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { throw e; } } finally { - MDC.remove("ConcurrentUpdateHttp2SolrClient.url"); + MDC.remove("ConcurrentUpdateBaseSolrClient.url"); } } @@ -605,7 +598,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { interruptRunnerThreadsPolling(); } } finally { - if (shutdownClient) client.close(); + if (shutdownClient) IOUtils.closeQuietly(client); } } @@ -639,16 +632,17 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } } - /** Constructs {@link ConcurrentUpdateHttp2SolrClient} instances from provided configuration. */ - public static class Builder { - protected Http2SolrClient client; + /** Constructs {@link ConcurrentUpdateBaseSolrClient} instances from provided configuration. */ + public abstract static class Builder { + public long idleTimeoutMillis; + protected HttpSolrClientBase client; protected String baseSolrUrl; protected String defaultCollection; protected int queueSize = 10; protected int threadCount; protected ExecutorService executorService; protected boolean streamDeletes; - protected boolean closeHttp2Client; + protected boolean closeHttpClient; private long pollQueueTimeMillis; /** @@ -657,7 +651,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { * <p>The provided URL must point to the root Solr path (i.e. "/solr"), for example: * * <pre> - * SolrClient client = new ConcurrentUpdateHttp2SolrClient.Builder("http://my-solr-server:8983/solr", http2Client) + * SolrClient client = new ConcurrentUpdateJettySolrClient.Builder("http://my-solr-server:8983/solr", http2Client) * .withDefaultCollection("core1") * .build(); * QueryResponse resp = client.query(new SolrQuery("*:*")); @@ -665,11 +659,11 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { * * @param baseSolrUrl a URL pointing to the root Solr path, typically of the form * "http[s]://host:port/solr" - * @param client a client for this ConcurrentUpdateHttp2SolrClient to use for all requests + * @param client a client for this ConcurrentUpdateJettySolrClient to use for all requests * internally. Callers are responsible for closing the provided client (after closing any * clients created by this builder) */ - public Builder(String baseSolrUrl, Http2SolrClient client) { + public Builder(String baseSolrUrl, HttpSolrClientBase client) { this(baseSolrUrl, client, false); } @@ -679,7 +673,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { * <p>The provided URL must point to the root Solr path (i.e. "/solr"), for example: * * <pre> - * SolrClient client = new ConcurrentUpdateHttp2SolrClient.Builder("http://my-solr-server:8983/solr", http2Client) + * SolrClient client = new ConcurrentUpdateJettySolrClient.Builder("http://my-solr-server:8983/solr", http2Client) * .withDefaultCollection("core1") * .build(); * QueryResponse resp = client.query(new SolrQuery("*:*")); @@ -687,16 +681,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { * * @param baseSolrUrl a URL pointing to the root Solr path, typically of the form * "http[s]://host:port/solr" - * @param client a client for this ConcurrentUpdateHttp2SolrClient to use for all requests + * @param client a client for this ConcurrentUpdateJettySolrClient to use for all requests * internally. - * @param closeHttp2Client a boolean flag indicating whether the created - * ConcurrentUpdateHttp2SolrClient should assume responsibility for closing the provided + * @param closeHttpClient a boolean flag indicating whether the created + * ConcurrentUpdateBaseSolrClient should assume responsibility for closing the provided * 'client' */ - public Builder(String baseSolrUrl, Http2SolrClient client, boolean closeHttp2Client) { + public Builder(String baseSolrUrl, HttpSolrClientBase client, boolean closeHttpClient) { this.baseSolrUrl = baseSolrUrl; this.client = client; - this.closeHttp2Client = closeHttp2Client; + this.closeHttpClient = closeHttpClient; } /** @@ -706,7 +700,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { * <p>This value should be carefully paired with the number of queue-consumer threads. A queue * with a maximum size set too high may require more memory. A queue with a maximum size set too * low may suffer decreased throughput as {@link - * ConcurrentUpdateHttp2SolrClient#request(SolrRequest)} calls block waiting to add requests to + * ConcurrentUpdateBaseSolrClient#request(SolrRequest)} calls block waiting to add requests to * the queue. * * <p>If not set, this defaults to 10. @@ -722,14 +716,14 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } /** - * The maximum number of threads used to empty {@link ConcurrentUpdateHttp2SolrClient}s queue. + * The maximum number of threads used to empty {@link ConcurrentUpdateBaseSolrClient}s queue. * * <p>Threads are created when documents are added to the client's internal queue and exit when * no updates remain in the queue. * * <p>This value should be carefully paired with the maximum queue capacity. A client with too * few threads may suffer decreased throughput as the queue fills up and {@link - * ConcurrentUpdateHttp2SolrClient#request(SolrRequest)} calls block waiting to add requests to + * ConcurrentUpdateBaseSolrClient#request(SolrRequest)} calls block waiting to add requests to * the queue. */ public Builder withThreadCount(int threadCount) { @@ -763,7 +757,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { /** * Configures created clients to not stream delete requests. * - * <p>With this option set when the created ConcurrentUpdateHttp2SolrClient sends a delete + * <p>With this option set when the created ConcurrentUpdateBaseSolrClient sends a delete * request it will first will lock the queue and block until all queued updates have been sent, * and then send the delete request. */ @@ -787,15 +781,8 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } /** - * Create a {@link ConcurrentUpdateHttp2SolrClient} based on the provided configuration options. + * Create a {@link ConcurrentUpdateBaseSolrClient} based on the provided configuration options. */ - public ConcurrentUpdateHttp2SolrClient build() { - if (baseSolrUrl == null) { - throw new IllegalArgumentException( - "Cannot create HttpSolrClient without a valid baseSolrUrl!"); - } - - return new ConcurrentUpdateHttp2SolrClient(this); - } + public abstract ConcurrentUpdateBaseSolrClient build(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClient.java new file mode 100644 index 00000000000..866ac66729e --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClient.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.eclipse.jetty.client.InputStreamResponseListener; +import org.eclipse.jetty.client.OutputStreamRequestContent; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.http.HttpMethod; + +/** A ConcurrentUpdate SolrClient using {@link Http2SolrClient}. */ +public class ConcurrentUpdateJettySolrClient extends ConcurrentUpdateBaseSolrClient { + protected static final Charset FALLBACK_CHARSET = StandardCharsets.UTF_8; + + private final Http2SolrClient client; + + public static class Builder extends ConcurrentUpdateBaseSolrClient.Builder { + /** + * @see ConcurrentUpdateBaseSolrClient.Builder#Builder(String, HttpSolrClientBase) + */ + public Builder(String baseUrl, Http2SolrClient client) { + this(baseUrl, client, false); + } + + /** + * @see ConcurrentUpdateBaseSolrClient.Builder#Builder(String, HttpSolrClientBase, boolean) + */ + public Builder(String baseSolrUrl, Http2SolrClient client, boolean closeHttpClient) { + super(baseSolrUrl, client, closeHttpClient); + this.idleTimeoutMillis = client.getIdleTimeoutMillis(); + } + + @Override + public ConcurrentUpdateJettySolrClient build() { + return new ConcurrentUpdateJettySolrClient(this); + } + } + + protected ConcurrentUpdateJettySolrClient(Builder builder) { + super(builder); + this.client = (Http2SolrClient) builder.client; + } + + @Override + protected InputStreamResponseListener doSendUpdateStream( + ConcurrentUpdateBaseSolrClient.Update update) throws IOException, InterruptedException { + InputStreamResponseListener responseListener; + try (OutStream out = initOutStream(basePath, update.request(), update.collection())) { + ConcurrentUpdateBaseSolrClient.Update upd = update; + while (upd != null) { + UpdateRequest req = upd.request(); + if (!out.belongToThisStream(req, upd.collection())) { + // Request has different params or destination core/collection, return to queue + queue.add(upd); + break; + } + send(out, upd.request(), upd.collection()); + out.flush(); + + notifyQueueAndRunnersIfEmptyQueue(); + upd = queue.poll(pollQueueTimeMillis, TimeUnit.MILLISECONDS); + } + responseListener = out.getResponseListener(); + } + return responseListener; + } + + private static class OutStream implements Closeable { + private final String origCollection; + private final SolrParams origParams; + private final OutputStreamRequestContent content; + private final InputStreamResponseListener responseListener; + private final boolean isXml; + + public OutStream( + String origCollection, + SolrParams origParams, + OutputStreamRequestContent content, + InputStreamResponseListener responseListener, + boolean isXml) { + this.origCollection = origCollection; + this.origParams = origParams; + this.content = content; + this.responseListener = responseListener; + this.isXml = isXml; + } + + boolean belongToThisStream(SolrRequest<?> solrRequest, String collection) { + return origParams.equals(solrRequest.getParams()) + && Objects.equals(origCollection, collection); + } + + public void write(byte[] b) throws IOException { + this.content.getOutputStream().write(b); + } + + public void flush() throws IOException { + this.content.getOutputStream().flush(); + } + + @Override + public void close() throws IOException { + if (isXml) { + write("</stream>".getBytes(FALLBACK_CHARSET)); + } + this.content.getOutputStream().close(); + } + + // TODO this class should be hidden + public InputStreamResponseListener getResponseListener() { + return responseListener; + } + } + + private OutStream initOutStream(String baseUrl, UpdateRequest updateRequest, String collection) + throws IOException { + String contentType = client.requestWriter.getUpdateContentType(); + final SolrParams origParams = updateRequest.getParams(); + ModifiableSolrParams requestParams = + client.initializeSolrParams(updateRequest, client.responseParser(updateRequest)); + + String basePath = baseUrl; + if (collection != null) basePath += "/" + collection; + if (!basePath.endsWith("/")) basePath += "/"; + + OutputStreamRequestContent content = new OutputStreamRequestContent(contentType); + Request postRequest = + client + .getHttpClient() + .newRequest(basePath + "update" + requestParams.toQueryString()) + .method(HttpMethod.POST) + .body(content); + client.decorateRequest(postRequest, updateRequest, false); + InputStreamResponseListener responseListener = + new Http2SolrClient.InputStreamReleaseTrackingResponseListener(); + postRequest.send(responseListener); + + boolean isXml = ClientUtils.TEXT_XML.equals(client.requestWriter.getUpdateContentType()); + OutStream outStream = new OutStream(collection, origParams, content, responseListener, isXml); + if (isXml) { + outStream.write("<stream>".getBytes(FALLBACK_CHARSET)); + } + return outStream; + } + + private void send(OutStream outStream, SolrRequest<?> req, String collection) throws IOException { + assert outStream.belongToThisStream(req, collection); + client.requestWriter.write(req, outStream.content.getOutputStream()); + if (outStream.isXml) { + // check for commit or optimize + SolrParams params = req.getParams(); + assert params != null : "params should not be null"; + if (params != null) { + String fmt = null; + if (params.getBool(UpdateParams.OPTIMIZE, false)) { + fmt = "<optimize waitSearcher=\"%s\" />"; + } else if (params.getBool(UpdateParams.COMMIT, false)) { + fmt = "<commit waitSearcher=\"%s\" />"; + } + if (fmt != null) { + byte[] content = + String.format( + Locale.ROOT, fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false) + "") + .getBytes(FALLBACK_CHARSET); + outStream.write(content); + } + } + } + outStream.flush(); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java index d8005979e90..7a377946811 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java @@ -16,7 +16,6 @@ */ package org.apache.solr.client.solrj.impl; -import java.io.Closeable; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -29,7 +28,6 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -49,13 +47,9 @@ import org.apache.solr.client.solrj.embedded.SSLConfig; import org.apache.solr.client.solrj.impl.HttpListenerFactory.RequestResponseListener; import org.apache.solr.client.solrj.jetty.LBJettySolrClient; import org.apache.solr.client.solrj.request.RequestWriter; -import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.params.SolrParams; -import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.ContentStream; -import org.apache.solr.common.util.EnvUtils; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.ObjectReleaseTracker; @@ -239,10 +233,8 @@ public class Http2SolrClient extends HttpSolrClientBase { : sslConfig.createClientContextFactory(); Long keyStoreReloadIntervalSecs = builder.keyStoreReloadIntervalSecs; - if (keyStoreReloadIntervalSecs == null - && EnvUtils.getPropertyAsBool("solr.keystore.reload.enabled", false)) { - keyStoreReloadIntervalSecs = - EnvUtils.getPropertyAsLong("solr.jetty.ssl.context.reload.scan.interval.secs", 30l); + if (keyStoreReloadIntervalSecs == null && Boolean.getBoolean("solr.keyStoreReload.enabled")) { + keyStoreReloadIntervalSecs = Long.getLong("solr.jetty.sslContext.reload.scanInterval", 30); } if (sslContextFactory != null && sslContextFactory.getKeyStoreResource() != null @@ -381,112 +373,10 @@ public class Http2SolrClient extends HttpSolrClientBase { } /** (visible for testing) */ - public long getIdleTimeout() { + public long getIdleTimeoutMillis() { return idleTimeoutMillis; } - public static class OutStream implements Closeable { - private final String origCollection; - private final SolrParams origParams; - private final OutputStreamRequestContent content; - private final InputStreamResponseListener responseListener; - private final boolean isXml; - - public OutStream( - String origCollection, - SolrParams origParams, - OutputStreamRequestContent content, - InputStreamResponseListener responseListener, - boolean isXml) { - this.origCollection = origCollection; - this.origParams = origParams; - this.content = content; - this.responseListener = responseListener; - this.isXml = isXml; - } - - boolean belongToThisStream(SolrRequest<?> solrRequest, String collection) { - return origParams.equals(solrRequest.getParams()) - && Objects.equals(origCollection, collection); - } - - public void write(byte[] b) throws IOException { - this.content.getOutputStream().write(b); - } - - public void flush() throws IOException { - this.content.getOutputStream().flush(); - } - - @Override - public void close() throws IOException { - if (isXml) { - write("</stream>".getBytes(FALLBACK_CHARSET)); - } - this.content.getOutputStream().close(); - } - - // TODO this class should be hidden - public InputStreamResponseListener getResponseListener() { - return responseListener; - } - } - - public OutStream initOutStream(String baseUrl, UpdateRequest updateRequest, String collection) - throws IOException { - String contentType = requestWriter.getUpdateContentType(); - final SolrParams origParams = updateRequest.getParams(); - ModifiableSolrParams requestParams = - initializeSolrParams(updateRequest, responseParser(updateRequest)); - - String basePath = baseUrl; - if (collection != null) basePath += "/" + collection; - if (!basePath.endsWith("/")) basePath += "/"; - - OutputStreamRequestContent content = new OutputStreamRequestContent(contentType); - Request postRequest = - httpClient - .newRequest(basePath + "update" + requestParams.toQueryString()) - .method(HttpMethod.POST) - .body(content); - decorateRequest(postRequest, updateRequest, false); - InputStreamResponseListener responseListener = new InputStreamReleaseTrackingResponseListener(); - postRequest.send(responseListener); - - boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType()); - OutStream outStream = new OutStream(collection, origParams, content, responseListener, isXml); - if (isXml) { - outStream.write("<stream>".getBytes(FALLBACK_CHARSET)); - } - return outStream; - } - - public void send(OutStream outStream, SolrRequest<?> req, String collection) throws IOException { - assert outStream.belongToThisStream(req, collection); - this.requestWriter.write(req, outStream.content.getOutputStream()); - if (outStream.isXml) { - // check for commit or optimize - SolrParams params = req.getParams(); - assert params != null : "params should not be null"; - if (params != null) { - String fmt = null; - if (params.getBool(UpdateParams.OPTIMIZE, false)) { - fmt = "<optimize waitSearcher=\"%s\" />"; - } else if (params.getBool(UpdateParams.COMMIT, false)) { - fmt = "<commit waitSearcher=\"%s\" />"; - } - if (fmt != null) { - byte[] content = - String.format( - Locale.ROOT, fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false) + "") - .getBytes(FALLBACK_CHARSET); - outStream.write(content); - } - } - } - outStream.flush(); - } - @Override public CompletableFuture<NamedList<Object>> requestAsync( final SolrRequest<?> solrRequest, String collection) { @@ -614,6 +504,7 @@ public class Http2SolrClient extends HttpSolrClientBase { } @Override + @Deprecated public NamedList<Object> requestWithBaseUrl( String baseUrl, SolrRequest<?> solrRequest, String collection) throws SolrServerException, IOException { @@ -687,7 +578,7 @@ public class Http2SolrClient extends HttpSolrClientBase { } } - private void decorateRequest(Request req, SolrRequest<?> solrRequest, boolean isAsync) { + protected void decorateRequest(Request req, SolrRequest<?> solrRequest, boolean isAsync) { req.headers(headers -> headers.remove(HttpHeader.ACCEPT_ENCODING)); req.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); req.timeout(requestTimeoutMillis, TimeUnit.MILLISECONDS); @@ -1046,7 +937,7 @@ public class Http2SolrClient extends HttpSolrClientBase { if (cookieStore == null) { return cookieStore; } - if (!EnvUtils.getPropertyAsBool("solr.solrj.http.cookies.enabled", false)) { + if (Boolean.getBoolean("solr.http.disableCookies")) { return new HttpCookieStore.Empty(); } /* @@ -1168,8 +1059,7 @@ public class Http2SolrClient extends HttpSolrClientBase { * * @see ObjectReleaseTracker */ - private static class InputStreamReleaseTrackingResponseListener - extends InputStreamResponseListener { + static class InputStreamReleaseTrackingResponseListener extends InputStreamResponseListener { @Override public InputStream getInputStream() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java index 2530e5c0fb8..4d8f6737e13 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java @@ -25,9 +25,9 @@ import java.util.function.IntSupplier; /** * Utility class for detecting stalls in request processing. * - * <p>This class is used by {@link ConcurrentUpdateHttp2SolrClient} to detect when request - * processing has stalled, which can happen if the server is unresponsive or if there's a problem - * with the connection. + * <p>This class is used by {@link ConcurrentUpdateBaseSolrClient} to detect when request processing + * has stalled, which can happen if the server is unresponsive or if there's a problem with the + * connection. */ public class StallDetection { private final LongAdder processedCount; diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java index 81e9aca90bb..46369c1f0cd 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java @@ -23,7 +23,6 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.StreamingResponseCallback; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient; import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.impl.JavaBinRequestWriter; import org.apache.solr.client.solrj.impl.JavaBinResponseParser; @@ -44,7 +43,7 @@ public class SolrExampleStreamingBinaryHttp2Test extends SolrExampleStreamingHtt .withRequestWriter(new JavaBinRequestWriter()) .withResponseParser(new JavaBinResponseParser()) .build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = + var concurrentClient = new ErrorTrackingConcurrentUpdateSolrClient.Builder(url, solrClient) .withDefaultCollection(DEFAULT_TEST_CORENAME) .withQueueSize(2) diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingHttp2Test.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingHttp2Test.java index 00087381f1f..cf7bc4bd10e 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingHttp2Test.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingHttp2Test.java @@ -23,7 +23,7 @@ import java.util.EnumSet; import java.util.List; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrExampleTests; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateJettySolrClient; import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.impl.XMLRequestWriter; import org.apache.solr.client.solrj.impl.XMLResponseParser; @@ -52,7 +52,7 @@ public class SolrExampleStreamingHttp2Test extends SolrExampleTests { .withRequestWriter(new XMLRequestWriter()) .withResponseParser(new XMLResponseParser()) .build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = + var concurrentClient = new ErrorTrackingConcurrentUpdateSolrClient.Builder(url, solrClient) .withDefaultCollection(DEFAULT_TEST_CORENAME) .withQueueSize(2) @@ -66,7 +66,7 @@ public class SolrExampleStreamingHttp2Test extends SolrExampleTests { final List<Throwable> failures = new ArrayList<>(); final String serverUrl = getBaseUrl(); try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = + var concurrentClient = new FailureRecordingConcurrentUpdateSolrClient.Builder(serverUrl, http2Client) .withDefaultCollection(DEFAULT_TEST_CORENAME) .withQueueSize(2) @@ -94,10 +94,11 @@ public class SolrExampleStreamingHttp2Test extends SolrExampleTests { } } - static class FailureRecordingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient { + static class FailureRecordingConcurrentUpdateSolrClient extends ConcurrentUpdateJettySolrClient { private final List<Throwable> failures = new ArrayList<>(); - public FailureRecordingConcurrentUpdateSolrClient(Builder builder) { + public FailureRecordingConcurrentUpdateSolrClient( + ConcurrentUpdateJettySolrClient.Builder builder) { super(builder); } @@ -106,7 +107,7 @@ public class SolrExampleStreamingHttp2Test extends SolrExampleTests { failures.add(ex); } - static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder { + static class Builder extends ConcurrentUpdateJettySolrClient.Builder { public Builder(String baseSolrUrl, Http2SolrClient http2Client) { super(baseSolrUrl, http2Client); } @@ -119,10 +120,11 @@ public class SolrExampleStreamingHttp2Test extends SolrExampleTests { } public static class ErrorTrackingConcurrentUpdateSolrClient - extends ConcurrentUpdateHttp2SolrClient { + extends ConcurrentUpdateJettySolrClient { public Throwable lastError = null; - public ErrorTrackingConcurrentUpdateSolrClient(Builder builder) { + public ErrorTrackingConcurrentUpdateSolrClient( + ConcurrentUpdateJettySolrClient.Builder builder) { super(builder); } @@ -131,7 +133,7 @@ public class SolrExampleStreamingHttp2Test extends SolrExampleTests { lastError = ex; } - public static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder { + public static class Builder extends ConcurrentUpdateJettySolrClient.Builder { public Builder(String baseSolrUrl, Http2SolrClient http2Client) { super(baseSolrUrl, http2Client, true); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientBadInputTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientBadInputTest.java similarity index 93% rename from solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientBadInputTest.java rename to solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientBadInputTest.java index 1b21772d88c..e6adb0d0987 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientBadInputTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientBadInputTest.java @@ -20,12 +20,11 @@ package org.apache.solr.client.solrj.impl; import java.util.ArrayList; import java.util.List; import org.apache.solr.SolrJettyTestBase; -import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.embedded.JettyConfig; import org.junit.BeforeClass; import org.junit.Test; -public class ConcurrentUpdateHttp2SolrClientBadInputTest extends SolrJettyTestBase { +public class ConcurrentUpdateJettySolrClientBadInputTest extends SolrJettyTestBase { private static final List<String> NULL_STR_LIST = null; private static final List<String> EMPTY_STR_LIST = new ArrayList<>(); private static final String ANY_COLLECTION = "ANY_COLLECTION"; @@ -42,8 +41,8 @@ public class ConcurrentUpdateHttp2SolrClientBadInputTest extends SolrJettyTestBa public void testDeleteByIdReportsInvalidIdLists() throws Exception { try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - SolrClient client = - new ConcurrentUpdateHttp2SolrClient.Builder(getBaseUrl(), http2Client) + var client = + new ConcurrentUpdateJettySolrClient.Builder(getBaseUrl(), http2Client) .withDefaultCollection(ANY_COLLECTION) .withQueueSize(ANY_QUEUE_SIZE) .withThreadCount(ANY_MAX_NUM_THREADS) @@ -75,8 +74,8 @@ public class ConcurrentUpdateHttp2SolrClientBadInputTest extends SolrJettyTestBa } try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - SolrClient client = - new ConcurrentUpdateHttp2SolrClient.Builder(getBaseUrl(), http2Client) + var client = + new ConcurrentUpdateJettySolrClient.Builder(getBaseUrl(), http2Client) .withDefaultCollection(ANY_COLLECTION) .withQueueSize(ANY_QUEUE_SIZE) .withThreadCount(ANY_MAX_NUM_THREADS) diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientMultiCollectionTest.java similarity index 88% rename from solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java rename to solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientMultiCollectionTest.java index b5a0526a4db..ef8ae2927cc 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientMultiCollectionTest.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.apache.ConcurrentUpdateSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.SolrInputDocument; @@ -32,11 +31,10 @@ import org.junit.BeforeClass; import org.junit.Test; /** - * {@link ConcurrentUpdateSolrClient} reuses the same HTTP connection to send multiple requests. - * These tests ensure that this connection-reuse never results in documents being sent to the wrong - * collection. See SOLR-12803 + * CUSC reuses the same HTTP connection to send multiple requests. These tests ensure that this + * connection-reuse never results in documents being sent to the wrong collection. See SOLR-12803 */ -public class ConcurrentUpdateHttp2SolrClientMultiCollectionTest extends SolrCloudTestCase { +public class ConcurrentUpdateJettySolrClientMultiCollectionTest extends SolrCloudTestCase { private static final String COLLECTION_ONE_NAME = "collection1"; private static final String COLLECTION_TWO_NAME = "collection2"; @@ -68,8 +66,8 @@ public class ConcurrentUpdateHttp2SolrClientMultiCollectionTest extends SolrClou int numTotalDocs = 1000; int numExpectedPerCollection = numTotalDocs / 2; try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - SolrClient client = - new ConcurrentUpdateHttp2SolrClient.Builder(solrUrl, http2Client) + var client = + new ConcurrentUpdateJettySolrClient.Builder(solrUrl, http2Client) .withQueueSize(numTotalDocs) .build()) { splitDocumentsAcrossCollections(client, numTotalDocs); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientTest.java similarity index 94% rename from solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientTest.java rename to solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientTest.java index d202834dd83..39a1e75e400 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJettySolrClientTest.java @@ -52,7 +52,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { +public class ConcurrentUpdateJettySolrClientTest extends SolrJettyTestBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** Mock endpoint where the CUSS being tested in this class sends requests. */ @@ -189,7 +189,7 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { final StringBuilder errors = new StringBuilder(); try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = + var concurrentClient = new OutcomeCountingConcurrentUpdateSolrClient.Builder( serverUrl, http2Client, successCounter, errorCounter, errors) .withQueueSize(cussQueueSize) @@ -247,8 +247,8 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { int cussQueueSize = 10; try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = - (new ConcurrentUpdateHttp2SolrClient.Builder(getBaseUrl(), http2Client)) + var concurrentClient = + (new ConcurrentUpdateJettySolrClient.Builder(getBaseUrl(), http2Client)) .withQueueSize(cussQueueSize) .withThreadCount(cussThreadCount) .build()) { @@ -267,8 +267,8 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { } try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = - new ConcurrentUpdateHttp2SolrClient.Builder(getBaseUrl(), http2Client) + var concurrentClient = + new ConcurrentUpdateJettySolrClient.Builder(getBaseUrl(), http2Client) .withDefaultCollection(DEFAULT_TEST_CORENAME) .withQueueSize(cussQueueSize) .withThreadCount(cussThreadCount) @@ -289,8 +289,8 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { int expected = numDocs * numRunnables; try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = - new ConcurrentUpdateHttp2SolrClient.Builder(getBaseUrl(), http2Client) + var concurrentClient = + new ConcurrentUpdateJettySolrClient.Builder(getBaseUrl(), http2Client) .withQueueSize(cussQueueSize) .withThreadCount(cussThreadCount) .setPollQueueTime(0, TimeUnit.MILLISECONDS) @@ -326,8 +326,8 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { } try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build(); - ConcurrentUpdateHttp2SolrClient concurrentClient = - new ConcurrentUpdateHttp2SolrClient.Builder(getBaseUrl(), http2Client) + var concurrentClient = + new ConcurrentUpdateJettySolrClient.Builder(getBaseUrl(), http2Client) .withDefaultCollection(DEFAULT_TEST_CORENAME) .withQueueSize(cussQueueSize) .withThreadCount(cussThreadCount) @@ -338,7 +338,7 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { } } - static class OutcomeCountingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient { + static class OutcomeCountingConcurrentUpdateSolrClient extends ConcurrentUpdateJettySolrClient { private final AtomicInteger successCounter; private final AtomicInteger failureCounter; private final StringBuilder errors; @@ -362,7 +362,7 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { successCounter.incrementAndGet(); } - static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder { + static class Builder extends ConcurrentUpdateJettySolrClient.Builder { protected final AtomicInteger successCounter; protected final AtomicInteger failureCounter; protected final StringBuilder errors; @@ -397,7 +397,7 @@ public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase { var http2Client = new Http2SolrClient.Builder().withIdleTimeout(1, TimeUnit.MILLISECONDS).build(); var client = - new ConcurrentUpdateHttp2SolrClient.Builder( + new ConcurrentUpdateJettySolrClient.Builder( "http://" + localHost.getHostAddress() + ":" diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java index d95ef9dfbfa..856e2103abe 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java @@ -671,7 +671,8 @@ public class Http2SolrClientTest extends HttpSolrClientTestBase { try (Http2SolrClient onlyBaseUrlChangedClient = new Http2SolrClient.Builder(url).withHttpClient(oldClient).build()) { - assertEquals(oldClient.getIdleTimeout(), onlyBaseUrlChangedClient.getIdleTimeout()); + assertEquals( + oldClient.getIdleTimeoutMillis(), onlyBaseUrlChangedClient.getIdleTimeoutMillis()); assertEquals(oldClient.getHttpClient(), onlyBaseUrlChangedClient.getHttpClient()); } @@ -688,8 +689,9 @@ public class Http2SolrClientTest extends HttpSolrClientTestBase { .withHttpClient(oldClient) .withIdleTimeout(newIdleTimeoutMs, TimeUnit.MILLISECONDS) .build()) { - assertNotEquals(oldClient.getIdleTimeout(), idleTimeoutChangedClient.getIdleTimeout()); - assertEquals(newIdleTimeoutMs, idleTimeoutChangedClient.getIdleTimeout()); + assertNotEquals( + oldClient.getIdleTimeoutMillis(), idleTimeoutChangedClient.getIdleTimeoutMillis()); + assertEquals(newIdleTimeoutMs, idleTimeoutChangedClient.getIdleTimeoutMillis()); NamedList<Object> response = idleTimeoutChangedClient.request(req); try (InputStream is = (InputStream) response.get("stream")) { String expect = diff --git a/solr/test-framework/src/java/org/apache/solr/client/solrj/apache/ConcurrentUpdateSolrClient.java b/solr/test-framework/src/java/org/apache/solr/client/solrj/apache/ConcurrentUpdateSolrClient.java index d81314c05c8..09cdc17b51c 100644 --- a/solr/test-framework/src/java/org/apache/solr/client/solrj/apache/ConcurrentUpdateSolrClient.java +++ b/solr/test-framework/src/java/org/apache/solr/client/solrj/apache/ConcurrentUpdateSolrClient.java @@ -41,7 +41,7 @@ import org.apache.http.entity.EntityTemplate; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient; import org.apache.solr.client.solrj.impl.StallDetection; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; @@ -70,7 +70,7 @@ import org.slf4j.MDC; * to use ConcurrentUpdateSolrClient with /update requests. The class {@link HttpSolrClient} is * better suited for the query interface. * - * @deprecated Please use {@link ConcurrentUpdateHttp2SolrClient} + * @deprecated Please use {@link ConcurrentUpdateBaseSolrClient} */ @Deprecated(since = "9.0") public class ConcurrentUpdateSolrClient extends SolrClient {
