This is an automated email from the ASF dual-hosted git repository.
epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new f9d962c1232 Refactor CLI StreamTool SolrClientCache usage (#4513)
f9d962c1232 is described below
commit f9d962c123216799c64607ac6dcc6c323408b288
Author: David Smiley <[email protected]>
AuthorDate: Sun Jun 21 14:39:21 2026 -0400
Refactor CLI StreamTool SolrClientCache usage (#4513)
Signed-off-by: Eric Pugh <[email protected]>
Co-authored-by: Eric Pugh <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../src/java/org/apache/solr/cli/CLIUtils.java | 4 +-
.../src/java/org/apache/solr/cli/StreamTool.java | 132 +++++++++++----------
2 files changed, 71 insertions(+), 65 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
index 9e8cd9aa8cb..1a8f76c0db8 100644
--- a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
+++ b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
@@ -180,7 +180,7 @@ public final class CLIUtils {
* is used, and warns those users. In the future we'll have urls ending with
/api as well.
*
* @param solrUrl The user supplied url to Solr.
- * @return the solrUrl in the format that Solr expects to see internally.
+ * @return a URL without any path, e.g. {@code http://localhost:8983}
*/
public static String normalizeSolrUrl(String solrUrl) {
return normalizeSolrUrl(solrUrl, true);
@@ -192,7 +192,7 @@ public final class CLIUtils {
*
* @param solrUrl The user supplied url to Solr.
* @param logUrlFormatWarning If a warning message should be logged about
the url format
- * @return the solrUrl in the format that Solr expects to see internally.
+ * @return a URL without any path, e.g. {@code http://localhost:8983}
*/
public static String normalizeSolrUrl(String solrUrl, boolean
logUrlFormatWarning) {
if (solrUrl != null) {
diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java
b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
index 203291c53f0..31070e7e27b 100644
--- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java
+++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
@@ -38,7 +38,6 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.solr.client.solrj.io.Lang;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -46,13 +45,15 @@ import
org.apache.solr.client.solrj.io.stream.PushBackStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.handler.CatStream;
/** Supports stream command in the bin/solr script. */
@@ -62,8 +63,6 @@ public class StreamTool extends ToolBase {
super(runtime);
}
- private final SolrClientCache solrClientCache = new SolrClientCache();
-
@Override
public String getName() {
return "stream";
@@ -166,14 +165,30 @@ public class StreamTool extends ToolBase {
}
}
- PushBackStream pushBackStream;
- if (execution.equalsIgnoreCase("local")) {
- pushBackStream = doLocalMode(cli, expr);
- } else {
- pushBackStream = doRemoteMode(cli, expr);
+ // Validate inputs before opening any connection to Solr.
+ boolean local = execution.equalsIgnoreCase("local");
+ if (!local) {
+ if (!cli.hasOption(COLLECTION_OPTION)) {
+ throw new IllegalStateException(
+ "You must provide --name COLLECTION with --execution remote
parameter.");
+ }
+ if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) {
+ throw new IllegalStateException(
+ "The stdin() expression is only usable with --execution local.");
+ }
}
+ // a stream needs a context
+ StreamContext streamContext = createStreamContext(cli);
+ // create the stream
+ PushBackStream pushBackStream = null;
try {
+ if (local) {
+ pushBackStream = doLocalMode(expr, streamContext.getStreamFactory());
+ } else {
+ pushBackStream = doRemoteMode(expr, cli);
+ }
+ pushBackStream.setStreamContext(streamContext);
pushBackStream.open();
if (outputHeaders == null) {
@@ -226,36 +241,60 @@ public class StreamTool extends ToolBase {
}
}
} finally {
- pushBackStream.close();
- solrClientCache.close();
+ if (pushBackStream != null) {
+ pushBackStream.close();
+ }
+ streamContext.getSolrClientCache().close();
}
echoIfVerbose("StreamTool -- Done.");
}
+ private StreamContext createStreamContext(CommandLine cli) throws Exception {
+ var jettyClientBuilder = new HttpJettySolrClient.Builder();
+ String credentials =
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
+ jettyClientBuilder.withOptionalBasicAuthCredentials(credentials);
+ HttpJettySolrClient client = jettyClientBuilder.build();
+
+ // subclass so we can ensure our client is closed when the cache is closed
+ var solrClientCache =
+ new SolrClientCache(client) {
+ @Override
+ public synchronized void close() {
+ super.close();
+ client.close();
+ }
+ };
+
+ try {
+ var solrConnection = CLIUtils.getSolrConnection(cli);
+ echoIfVerbose("Connecting to Solr at " + solrConnection);
+
+ StreamContext streamContext = new StreamContext();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ StreamFactory streamFactory = new DefaultStreamFactory();
+ streamFactory.withDefaultSolrConnection(solrConnection);
+ streamContext.setStreamFactory(streamFactory);
+ return streamContext;
+ } catch (Exception e) {
+ IOUtils.closeQuietly(solrClientCache);
+ throw e;
+ }
+ }
+
/**
* Runs a streaming expression in the local process of the CLI.
*
* <p>Running locally means that parallelization support or those
expressions requiring access to
* internal Solr capabilities will not function.
*
- * @param cli The CLI invoking the call
- * @param expr The streaming expression to be parsed and in the context of
the CLI process
+ * @param expr The streaming expression to be parsed and run in the context
of the CLI process
+ * @param streamFactory The factory used to construct the streaming
expression
* @return A connection to the streaming expression that receives Tuples as
they are emitted
* locally.
*/
- private PushBackStream doLocalMode(CommandLine cli, String expr) throws
Exception {
- var solrConnection = CLIUtils.getSolrConnection(cli);
- echoIfVerbose("Connecting to Solr at " + solrConnection.toString());
- solrClientCache.setBasicAuthCredentials(
- cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION));
- solrClientCache.getCloudSolrClient(solrConnection);
-
- TupleStream stream;
- PushBackStream pushBackStream;
-
- StreamExpression streamExpression = StreamExpressionParser.parse(expr);
- StreamFactory streamFactory = new StreamFactory();
+ private PushBackStream doLocalMode(String expr, StreamFactory streamFactory)
throws Exception {
// stdin is ONLY available in the local mode, not in the remote mode as it
// requires access to System.in
@@ -265,23 +304,7 @@ public class StreamTool extends ToolBase {
// logic about where to read data from.
streamFactory.withFunctionName("cat", LocalCatStream.class);
- streamFactory.withDefaultSolrConnection(solrConnection);
-
- Lang.register(streamFactory);
-
- assert streamExpression != null;
- stream = streamFactory.constructStream(streamExpression);
-
- pushBackStream = new PushBackStream(stream);
-
- // Now we can run the stream and return the results.
- StreamContext streamContext = new StreamContext();
- streamContext.setSolrClientCache(solrClientCache);
-
- // Output the headers
- pushBackStream.setStreamContext(streamContext);
-
- return pushBackStream;
+ return new PushBackStream(streamFactory.constructStream(expr));
}
/**
@@ -291,35 +314,18 @@ public class StreamTool extends ToolBase {
* <p>Running remotely allows you to use all the standard Streaming
Expression capabilities as the
* expression is running in a Solr environment.
*
- * @param cli The CLI invoking the call
* @param expr The streaming expression to be parsed and run remotely
+ * @param cli The CLI invoking the call
* @return A connection to the streaming expression that receives Tuples as
they are emitted from
* Solr /stream.
*/
- private PushBackStream doRemoteMode(CommandLine cli, String expr) throws
Exception {
+ private PushBackStream doRemoteMode(String expr, CommandLine cli) throws
Exception {
String solrUrl = CLIUtils.normalizeSolrUrl(cli);
- if (!cli.hasOption(COLLECTION_OPTION)) {
- throw new IllegalStateException(
- "You must provide --name COLLECTION with --execution remote
parameter.");
- }
String collection = cli.getOptionValue(COLLECTION_OPTION);
- if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) {
- throw new IllegalStateException(
- "The stdin() expression is only usable with --worker local set up.");
- }
-
- final SolrStream solrStream =
- new SolrStream(solrUrl + "/solr/" + collection, params("qt",
"/stream", "expr", expr));
-
- String credentials =
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
- if (credentials != null) {
- String username = credentials.split(":")[0];
- String password = credentials.split(":")[1];
- solrStream.setCredentials(username, password);
- }
- return new PushBackStream(solrStream);
+ return new PushBackStream(
+ new SolrStream(solrUrl + "/solr/" + collection, params("qt",
"/stream", "expr", expr)));
}
private static ModifiableSolrParams params(String... params) {