This is an automated email from the ASF dual-hosted git repository.

epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new f9d962c1232 Refactor CLI StreamTool SolrClientCache usage (#4513)
f9d962c1232 is described below

commit f9d962c123216799c64607ac6dcc6c323408b288
Author: David Smiley <[email protected]>
AuthorDate: Sun Jun 21 14:39:21 2026 -0400

    Refactor CLI StreamTool SolrClientCache usage (#4513)
    
    Signed-off-by: Eric Pugh <[email protected]>
    Co-authored-by: Eric Pugh <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .../src/java/org/apache/solr/cli/CLIUtils.java     |   4 +-
 .../src/java/org/apache/solr/cli/StreamTool.java   | 132 +++++++++++----------
 2 files changed, 71 insertions(+), 65 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java 
b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
index 9e8cd9aa8cb..1a8f76c0db8 100644
--- a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
+++ b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java
@@ -180,7 +180,7 @@ public final class CLIUtils {
    * is used, and warns those users. In the future we'll have urls ending with 
/api as well.
    *
    * @param solrUrl The user supplied url to Solr.
-   * @return the solrUrl in the format that Solr expects to see internally.
+   * @return a URL without any path, e.g. {@code http://localhost:8983}
    */
   public static String normalizeSolrUrl(String solrUrl) {
     return normalizeSolrUrl(solrUrl, true);
@@ -192,7 +192,7 @@ public final class CLIUtils {
    *
    * @param solrUrl The user supplied url to Solr.
    * @param logUrlFormatWarning If a warning message should be logged about 
the url format
-   * @return the solrUrl in the format that Solr expects to see internally.
+   * @return a URL without any path, e.g. {@code http://localhost:8983}
    */
   public static String normalizeSolrUrl(String solrUrl, boolean 
logUrlFormatWarning) {
     if (solrUrl != null) {
diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java 
b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
index 203291c53f0..31070e7e27b 100644
--- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java
+++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
@@ -38,7 +38,6 @@ import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.solr.client.solrj.io.Lang;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -46,13 +45,15 @@ import 
org.apache.solr.client.solrj.io.stream.PushBackStream;
 import org.apache.solr.client.solrj.io.stream.SolrStream;
 import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.handler.CatStream;
 
 /** Supports stream command in the bin/solr script. */
@@ -62,8 +63,6 @@ public class StreamTool extends ToolBase {
     super(runtime);
   }
 
-  private final SolrClientCache solrClientCache = new SolrClientCache();
-
   @Override
   public String getName() {
     return "stream";
@@ -166,14 +165,30 @@ public class StreamTool extends ToolBase {
       }
     }
 
-    PushBackStream pushBackStream;
-    if (execution.equalsIgnoreCase("local")) {
-      pushBackStream = doLocalMode(cli, expr);
-    } else {
-      pushBackStream = doRemoteMode(cli, expr);
+    // Validate inputs before opening any connection to Solr.
+    boolean local = execution.equalsIgnoreCase("local");
+    if (!local) {
+      if (!cli.hasOption(COLLECTION_OPTION)) {
+        throw new IllegalStateException(
+            "You must provide --name COLLECTION with --execution remote 
parameter.");
+      }
+      if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) {
+        throw new IllegalStateException(
+            "The stdin() expression is only usable with --execution local.");
+      }
     }
 
+    // a stream needs a context
+    StreamContext streamContext = createStreamContext(cli);
+    // create the stream
+    PushBackStream pushBackStream = null;
     try {
+      if (local) {
+        pushBackStream = doLocalMode(expr, streamContext.getStreamFactory());
+      } else {
+        pushBackStream = doRemoteMode(expr, cli);
+      }
+      pushBackStream.setStreamContext(streamContext);
       pushBackStream.open();
 
       if (outputHeaders == null) {
@@ -226,36 +241,60 @@ public class StreamTool extends ToolBase {
         }
       }
     } finally {
-      pushBackStream.close();
-      solrClientCache.close();
+      if (pushBackStream != null) {
+        pushBackStream.close();
+      }
+      streamContext.getSolrClientCache().close();
     }
 
     echoIfVerbose("StreamTool -- Done.");
   }
 
+  private StreamContext createStreamContext(CommandLine cli) throws Exception {
+    var jettyClientBuilder = new HttpJettySolrClient.Builder();
+    String credentials = 
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
+    jettyClientBuilder.withOptionalBasicAuthCredentials(credentials);
+    HttpJettySolrClient client = jettyClientBuilder.build();
+
+    // subclass so we can ensure our client is closed when the cache is closed
+    var solrClientCache =
+        new SolrClientCache(client) {
+          @Override
+          public synchronized void close() {
+            super.close();
+            client.close();
+          }
+        };
+
+    try {
+      var solrConnection = CLIUtils.getSolrConnection(cli);
+      echoIfVerbose("Connecting to Solr at " + solrConnection);
+
+      StreamContext streamContext = new StreamContext();
+      streamContext.setSolrClientCache(solrClientCache);
+
+      StreamFactory streamFactory = new DefaultStreamFactory();
+      streamFactory.withDefaultSolrConnection(solrConnection);
+      streamContext.setStreamFactory(streamFactory);
+      return streamContext;
+    } catch (Exception e) {
+      IOUtils.closeQuietly(solrClientCache);
+      throw e;
+    }
+  }
+
   /**
    * Runs a streaming expression in the local process of the CLI.
    *
    * <p>Running locally means that parallelization support or those 
expressions requiring access to
    * internal Solr capabilities will not function.
    *
-   * @param cli The CLI invoking the call
-   * @param expr The streaming expression to be parsed and in the context of 
the CLI process
+   * @param expr The streaming expression to be parsed and run in the context 
of the CLI process
+   * @param streamFactory The factory used to construct the streaming 
expression
    * @return A connection to the streaming expression that receives Tuples as 
they are emitted
    *     locally.
    */
-  private PushBackStream doLocalMode(CommandLine cli, String expr) throws 
Exception {
-    var solrConnection = CLIUtils.getSolrConnection(cli);
-    echoIfVerbose("Connecting to Solr at " + solrConnection.toString());
-    solrClientCache.setBasicAuthCredentials(
-        cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION));
-    solrClientCache.getCloudSolrClient(solrConnection);
-
-    TupleStream stream;
-    PushBackStream pushBackStream;
-
-    StreamExpression streamExpression = StreamExpressionParser.parse(expr);
-    StreamFactory streamFactory = new StreamFactory();
+  private PushBackStream doLocalMode(String expr, StreamFactory streamFactory) 
throws Exception {
 
     // stdin is ONLY available in the local mode, not in the remote mode as it
     // requires access to System.in
@@ -265,23 +304,7 @@ public class StreamTool extends ToolBase {
     // logic about where to read data from.
     streamFactory.withFunctionName("cat", LocalCatStream.class);
 
-    streamFactory.withDefaultSolrConnection(solrConnection);
-
-    Lang.register(streamFactory);
-
-    assert streamExpression != null;
-    stream = streamFactory.constructStream(streamExpression);
-
-    pushBackStream = new PushBackStream(stream);
-
-    // Now we can run the stream and return the results.
-    StreamContext streamContext = new StreamContext();
-    streamContext.setSolrClientCache(solrClientCache);
-
-    // Output the headers
-    pushBackStream.setStreamContext(streamContext);
-
-    return pushBackStream;
+    return new PushBackStream(streamFactory.constructStream(expr));
   }
 
   /**
@@ -291,35 +314,18 @@ public class StreamTool extends ToolBase {
    * <p>Running remotely allows you to use all the standard Streaming 
Expression capabilities as the
    * expression is running in a Solr environment.
    *
-   * @param cli The CLI invoking the call
    * @param expr The streaming expression to be parsed and run remotely
+   * @param cli The CLI invoking the call
    * @return A connection to the streaming expression that receives Tuples as 
they are emitted from
    *     Solr /stream.
    */
-  private PushBackStream doRemoteMode(CommandLine cli, String expr) throws 
Exception {
+  private PushBackStream doRemoteMode(String expr, CommandLine cli) throws 
Exception {
 
     String solrUrl = CLIUtils.normalizeSolrUrl(cli);
-    if (!cli.hasOption(COLLECTION_OPTION)) {
-      throw new IllegalStateException(
-          "You must provide --name COLLECTION with --execution remote 
parameter.");
-    }
     String collection = cli.getOptionValue(COLLECTION_OPTION);
 
-    if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) {
-      throw new IllegalStateException(
-          "The stdin() expression is only usable with --worker local set up.");
-    }
-
-    final SolrStream solrStream =
-        new SolrStream(solrUrl + "/solr/" + collection, params("qt", 
"/stream", "expr", expr));
-
-    String credentials = 
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
-    if (credentials != null) {
-      String username = credentials.split(":")[0];
-      String password = credentials.split(":")[1];
-      solrStream.setCredentials(username, password);
-    }
-    return new PushBackStream(solrStream);
+    return new PushBackStream(
+        new SolrStream(solrUrl + "/solr/" + collection, params("qt", 
"/stream", "expr", expr)));
   }
 
   private static ModifiableSolrParams params(String... params) {

Reply via email to