Copilot commented on code in PR #4050: URL: https://github.com/apache/solr/pull/4050#discussion_r2692368518
########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.request.UpdateRequest; + +/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */ +public class ConcurrentUpdateJdkSolrClient extends ConcurrentUpdateBaseSolrClient { + + private final HttpJdkSolrClient client; + + protected ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) { + super(builder); + this.client = (HttpJdkSolrClient) builder.getClient(); + } + + @Override + protected StreamingResponse doSendUpdateStream(Update update) { + UpdateRequest req = update.request(); + String collection = update.collection(); + CompletableFuture<HttpResponse<InputStream>> resp = + client.requestInputStreamAsync(basePath, req, collection); + + return new StreamingResponse() { + + @Override + public int awaitResponse(long timeoutMillis) throws Exception { + return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode(); + } + + @Override + public InputStream getInputStream() { + try { + return resp.get().body(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return InputStream.nullInputStream(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getUnderlyingResponse() { + return resp; + } + + @Override + public void close() throws IOException { + getInputStream().close(); Review Comment: Potential resource leak: The close method calls getInputStream which may block or throw exceptions. If resp.get() is not yet complete when close is called, this could block indefinitely or throw an exception before closing the stream. Additionally, if getInputStream returns a null stream due to InterruptedException (as per the implementation), calling close on it may not clean up the underlying HTTP response resources. Consider checking the response completion state before attempting to get and close the input stream, or handle the case where the response may not be available yet. ```suggestion if (!resp.isDone()) { // Cancel the request if it has not completed yet to avoid blocking and free resources. resp.cancel(true); return; } HttpResponse<InputStream> httpResponse; try { httpResponse = resp.getNow(null); } catch (RuntimeException e) { // Response completed exceptionally; nothing to close here. return; } if (httpResponse == null) { return; } InputStream body = httpResponse.body(); if (body != null) { body.close(); } ``` ########## solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.SocketTimeoutException; +import java.net.http.HttpConnectTimeoutException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.embedded.JettyConfig; +import org.apache.solr.util.SolrJettyTestRule; +import org.eclipse.jetty.ee10.servlet.ServletHolder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ConcurrentUpdateSolrClientTestBase extends SolrTestCaseJ4 { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs); + + public abstract ConcurrentUpdateBaseSolrClient concurrentClient( + HttpSolrClientBase solrClient, + String baseUrl, + String defaultCollection, + int queueSize, + int threadCount, + boolean disablePollQueue); + + public abstract ConcurrentUpdateBaseSolrClient outcomeCountingConcurrentClient( + String serverUrl, + int queueSize, + int threadCount, + HttpSolrClientBase solrClient, + AtomicInteger successCounter, + AtomicInteger failureCounter, + StringBuilder errors); + + /** Mock endpoint where the CUSS being tested in this class sends requests. */ + public static class TestServlet extends HttpServlet + implements JavaBinUpdateRequestCodec.StreamingUpdateHandler { + private static final long serialVersionUID = 1L; + + public static void clear() { + lastMethod = null; + headers = null; + parameters = null; + errorCode = null; + numReqsRcvd.set(0); + numDocsRcvd.set(0); + } + + public static Integer errorCode = null; + public static String lastMethod = null; + public static HashMap<String, String> headers = null; + public static Map<String, String[]> parameters = null; + public static AtomicInteger numReqsRcvd = new AtomicInteger(0); + public static AtomicInteger numDocsRcvd = new AtomicInteger(0); + + public static void setErrorCode(Integer code) { + errorCode = code; + } + + private void setHeaders(HttpServletRequest req) { + Enumeration<String> headerNames = req.getHeaderNames(); + headers = new HashMap<>(); + while (headerNames.hasMoreElements()) { + final String name = headerNames.nextElement(); + headers.put(name, req.getHeader(name)); + } + } + + private void setParameters(HttpServletRequest req) { + // parameters = req.getParameterMap(); Review Comment: The setParameters method body is commented out, which makes the method ineffective. If parameter tracking is not needed for these tests, the method should either be removed entirely or have a comment explaining why it's intentionally empty. If it was meant to be implemented but was left unfinished, this should be completed. ```suggestion parameters = req.getParameterMap(); ``` ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.request.UpdateRequest; + +/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */ +public class ConcurrentUpdateJdkSolrClient extends ConcurrentUpdateBaseSolrClient { + + private final HttpJdkSolrClient client; + + protected ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) { + super(builder); + this.client = (HttpJdkSolrClient) builder.getClient(); + } + + @Override + protected StreamingResponse doSendUpdateStream(Update update) { + UpdateRequest req = update.request(); + String collection = update.collection(); + CompletableFuture<HttpResponse<InputStream>> resp = + client.requestInputStreamAsync(basePath, req, collection); + + return new StreamingResponse() { + + @Override + public int awaitResponse(long timeoutMillis) throws Exception { + return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode(); + } + + @Override + public InputStream getInputStream() { + try { + return resp.get().body(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return InputStream.nullInputStream(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getUnderlyingResponse() { + return resp; + } + + @Override + public void close() throws IOException { + getInputStream().close(); + } + }; + } Review Comment: Missing javadoc for the doSendUpdateStream method. This method implements a critical abstract method from the base class and should document its behavior, parameters, return value, and any exceptions it may throw. The StreamingResponse contract and how this implementation fulfills it should be clearly documented. ########## changelog/unreleased/SOLR-18065.yml: ########## @@ -0,0 +1,7 @@ +title: Added new ConcurrentUpdateJdkSolrClient that works with HttpJdkSolrClient Review Comment: The changelog entry uses "Added new" which is redundant. The "type: added" field already indicates this is a new feature, so the title should simply say "ConcurrentUpdateJdkSolrClient that works with HttpJdkSolrClient" instead of "Added new ConcurrentUpdateJdkSolrClient that works with HttpJdkSolrClient". ```suggestion title: ConcurrentUpdateJdkSolrClient that works with HttpJdkSolrClient ``` ########## solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.SocketTimeoutException; +import java.net.http.HttpConnectTimeoutException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.embedded.JettyConfig; +import org.apache.solr.util.SolrJettyTestRule; +import org.eclipse.jetty.ee10.servlet.ServletHolder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ConcurrentUpdateSolrClientTestBase extends SolrTestCaseJ4 { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs); + + public abstract ConcurrentUpdateBaseSolrClient concurrentClient( + HttpSolrClientBase solrClient, + String baseUrl, + String defaultCollection, + int queueSize, + int threadCount, + boolean disablePollQueue); + + public abstract ConcurrentUpdateBaseSolrClient outcomeCountingConcurrentClient( + String serverUrl, + int queueSize, + int threadCount, + HttpSolrClientBase solrClient, + AtomicInteger successCounter, + AtomicInteger failureCounter, + StringBuilder errors); Review Comment: Missing javadoc for abstract methods in the test base class. The abstract methods solrClient, concurrentClient, and outcomeCountingConcurrentClient should have javadoc comments explaining their purpose, parameters, and return values. This would help developers implementing subclasses understand what these factory methods should do and what constraints they must satisfy. ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java: ########## @@ -143,6 +143,18 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil assert ObjectReleaseTracker.track(this); } + protected CompletableFuture<HttpResponse<InputStream>> requestInputStreamAsync( + String overrideBaseUrl, final SolrRequest<?> solrRequest, String collection) { + try { + PreparedRequest pReq = prepareRequest(solrRequest, collection, overrideBaseUrl); + return httpClient.sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream()); + } catch (Exception e) { + CompletableFuture<HttpResponse<InputStream>> cf = new CompletableFuture<>(); + cf.completeExceptionally(e); + return cf; + } + } Review Comment: Missing javadoc for the new protected method requestInputStreamAsync. This method is protected and likely intended for use by subclasses or related classes like ConcurrentUpdateJdkSolrClient. It should document its parameters, return value, and the contract around the returned CompletableFuture, including what exceptions might be wrapped in the future and when it might complete exceptionally. ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.request.UpdateRequest; + +/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */ +public class ConcurrentUpdateJdkSolrClient extends ConcurrentUpdateBaseSolrClient { + + private final HttpJdkSolrClient client; + + protected ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) { + super(builder); + this.client = (HttpJdkSolrClient) builder.getClient(); + } + + @Override + protected StreamingResponse doSendUpdateStream(Update update) { + UpdateRequest req = update.request(); + String collection = update.collection(); + CompletableFuture<HttpResponse<InputStream>> resp = + client.requestInputStreamAsync(basePath, req, collection); + + return new StreamingResponse() { + + @Override + public int awaitResponse(long timeoutMillis) throws Exception { + return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode(); + } + + @Override + public InputStream getInputStream() { + try { + return resp.get().body(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return InputStream.nullInputStream(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getUnderlyingResponse() { + return resp; + } + + @Override + public void close() throws IOException { + getInputStream().close(); + } + }; + } + + public static class Builder extends ConcurrentUpdateBaseSolrClient.Builder { + /** + * @see org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient.Builder#Builder(String, + * HttpSolrClientBase) + */ + public Builder(String baseUrl, HttpJdkSolrClient client) { + + this(baseUrl, client, false); + // The base class uses idle timeout with StreamingResponse#awaitResponse so it needs to be + // set! + this.idleTimeoutMillis = 1000; Review Comment: The idleTimeoutMillis is hardcoded to 1000 milliseconds in the primary constructor. This overrides any value that might have been set on the HttpJdkSolrClient. Unlike the Jetty implementation which retrieves the idle timeout from the client (line 63 in ConcurrentUpdateJettySolrClient), this hardcoded value could cause unexpected behavior if users have configured a different timeout on their HttpJdkSolrClient. Consider retrieving the idle timeout from the client parameter instead of hardcoding it, or document why the hardcoded value is necessary for the JDK client. ```suggestion // set. Prefer the client's configured idle timeout when available, otherwise fall back to // the previous default of 1000 ms. long clientIdleTimeout = client.getIdleTimeout(); this.idleTimeoutMillis = clientIdleTimeout > 0 ? clientIdleTimeout : 1000; ``` ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.request.UpdateRequest; + +/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */ +public class ConcurrentUpdateJdkSolrClient extends ConcurrentUpdateBaseSolrClient { + + private final HttpJdkSolrClient client; + + protected ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) { + super(builder); + this.client = (HttpJdkSolrClient) builder.getClient(); + } + + @Override + protected StreamingResponse doSendUpdateStream(Update update) { + UpdateRequest req = update.request(); + String collection = update.collection(); + CompletableFuture<HttpResponse<InputStream>> resp = + client.requestInputStreamAsync(basePath, req, collection); + + return new StreamingResponse() { + + @Override + public int awaitResponse(long timeoutMillis) throws Exception { + return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode(); + } + + @Override + public InputStream getInputStream() { + try { + return resp.get().body(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return InputStream.nullInputStream(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getUnderlyingResponse() { + return resp; + } + + @Override + public void close() throws IOException { + getInputStream().close(); + } + }; + } + + public static class Builder extends ConcurrentUpdateBaseSolrClient.Builder { + /** + * @see org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient.Builder#Builder(String, + * HttpSolrClientBase) + */ + public Builder(String baseUrl, HttpJdkSolrClient client) { + + this(baseUrl, client, false); + // The base class uses idle timeout with StreamingResponse#awaitResponse so it needs to be + // set! Review Comment: The comment on lines 84-86 explains why idleTimeoutMillis is set, but it would be more helpful if this explanation appeared as a proper javadoc comment on the constructor itself, or at least as a more detailed inline comment. The current comment is too terse and doesn't explain why the value 1000 was chosen specifically. ```suggestion * Create a {@link ConcurrentUpdateJdkSolrClient} builder for an existing {@link HttpJdkSolrClient}. * * <p>This constructor configures a default {@code idleTimeoutMillis} value used by the * {@link ConcurrentUpdateBaseSolrClient} base class when waiting in * {@code StreamingResponse#awaitResponse}. The idle timeout must be set explicitly for * the JDK-based HTTP client so that consumer threads do not block indefinitely while still * polling frequently enough to keep update latency low. A value of {@code 1000} milliseconds * (1 second) is chosen as a conservative default that avoids busy waiting while providing * reasonably responsive flushing of asynchronous update responses. * * @param baseUrl the base Solr URL this client will send requests to * @param client the {@link HttpJdkSolrClient} used to execute HTTP requests * @see org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient.Builder#Builder(String, * HttpSolrClientBase) */ public Builder(String baseUrl, HttpJdkSolrClient client) { this(baseUrl, client, false); // Ensure the base class has a sensible idle timeout for awaitResponse(). ``` ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.request.UpdateRequest; + +/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */ +public class ConcurrentUpdateJdkSolrClient extends ConcurrentUpdateBaseSolrClient { + + private final HttpJdkSolrClient client; + + protected ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) { + super(builder); + this.client = (HttpJdkSolrClient) builder.getClient(); + } + + @Override + protected StreamingResponse doSendUpdateStream(Update update) { + UpdateRequest req = update.request(); + String collection = update.collection(); + CompletableFuture<HttpResponse<InputStream>> resp = + client.requestInputStreamAsync(basePath, req, collection); + + return new StreamingResponse() { + + @Override + public int awaitResponse(long timeoutMillis) throws Exception { + return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode(); + } + + @Override + public InputStream getInputStream() { + try { + return resp.get().body(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return InputStream.nullInputStream(); + } catch (ExecutionException e) { + throw new RuntimeException(e); Review Comment: The getInputStream method has inconsistent error handling. When an InterruptedException occurs, it correctly restores the interrupt status and returns a null input stream, but when an ExecutionException occurs, it throws a RuntimeException. This inconsistency could lead to unpredictable behavior. Consider handling both exceptions consistently, either by returning a null input stream for both cases or by documenting why ExecutionException should be fatal while InterruptedException is not. ```suggestion return InputStream.nullInputStream(); ``` ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.request.UpdateRequest; + +/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */ +public class ConcurrentUpdateJdkSolrClient extends ConcurrentUpdateBaseSolrClient { + + private final HttpJdkSolrClient client; + + protected ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) { + super(builder); + this.client = (HttpJdkSolrClient) builder.getClient(); + } Review Comment: Missing javadoc for the protected constructor. While the class itself has minimal documentation, the constructor should document its parameter and explain when this constructor would be used versus the public Builder pattern, especially since this is part of a public API that extends a base class. ########## solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.SocketTimeoutException; +import java.net.http.HttpConnectTimeoutException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.embedded.JettyConfig; +import org.apache.solr.util.SolrJettyTestRule; +import org.eclipse.jetty.ee10.servlet.ServletHolder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ConcurrentUpdateSolrClientTestBase extends SolrTestCaseJ4 { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs); + + public abstract ConcurrentUpdateBaseSolrClient concurrentClient( + HttpSolrClientBase solrClient, + String baseUrl, + String defaultCollection, + int queueSize, + int threadCount, + boolean disablePollQueue); + + public abstract ConcurrentUpdateBaseSolrClient outcomeCountingConcurrentClient( + String serverUrl, + int queueSize, + int threadCount, + HttpSolrClientBase solrClient, Review Comment: Inconsistent parameter ordering in abstract methods. In concurrentClient, the solrClient parameter is first, but in outcomeCountingConcurrentClient, it's fourth (after serverUrl, queueSize, and threadCount). This inconsistency makes the API harder to use and could lead to confusion. Consider reordering the parameters in outcomeCountingConcurrentClient to match the pattern of concurrentClient for better consistency. ```suggestion HttpSolrClientBase solrClient, String serverUrl, int queueSize, int threadCount, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
