This is an automated email from the ASF dual-hosted git repository.

epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b9e5392575 SOLR-14673: Add bin/solr stream CLI  (#2479)
4b9e5392575 is described below

commit 4b9e539257512801cf740d9bc142a95a99576103
Author: Eric Pugh <[email protected]>
AuthorDate: Wed Nov 13 15:32:46 2024 -0500

    SOLR-14673: Add bin/solr stream CLI  (#2479)
    
    * Allows you to run a streaming expression on the Solr server, using the 
/stream end point.
    * Allows you to run a streaming expression locally, by specifying 
--execution=local.
---
 solr/CHANGES.txt                                   |   2 +
 solr/bin/solr.cmd                                  |   4 +-
 .../core/src/java/org/apache/solr/cli/SolrCLI.java |   4 +-
 .../src/java/org/apache/solr/cli/StreamTool.java   | 531 +++++++++++++++++++++
 .../java/org/apache/solr/handler/CatStream.java    |   5 +-
 .../test/org/apache/solr/cli/StreamToolTest.java   | 366 ++++++++++++++
 solr/packaging/test/test_stream.bats               |  86 ++++
 .../modules/query-guide/pages/stream-tool.adoc     | 176 +++++++
 .../query-guide/pages/streaming-expressions.adoc   |   4 +
 .../modules/query-guide/querying-nav.adoc          |   1 +
 .../solr/client/solrj/io/SolrClientCache.java      |  29 +-
 .../solr/client/solrj/io/stream/LetStream.java     |   5 +
 12 files changed, 1197 insertions(+), 16 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 874adfcc4d7..93e737a543d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -12,6 +12,8 @@ New Features
 
 * SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode, 
use --user-managed switch for User Managed (aka Standalone) mode.  (Eric Pugh)
 
+* SOLR-14673: Solr CLI now has bin/solr stream tool that executates streaming 
expressions via command line, either locally or on solr cluster. (Eric Pugh)
+
 Improvements
 ---------------------
 
diff --git a/solr/bin/solr.cmd b/solr/bin/solr.cmd
index 5fd6ec44aea..9f875926517 100755
--- a/solr/bin/solr.cmd
+++ b/solr/bin/solr.cmd
@@ -1175,9 +1175,9 @@ for %%a in (%*) do (
    ) else (
       set "option!option!=%%a"
       if "!option!" equ "-s" set "SOLR_HOME=%%a"
-      if "!option!" equ "--solr-home" set "SOLR_HOME=%%a"        
+      if "!option!" equ "--solr-home" set "SOLR_HOME=%%a"
       if "!option!" equ "-d" set "SOLR_SERVER_DIR=%%a"
-      if "!option!" equ "--server-dir" set "SOLR_SERVER_DIR=%%a"    
+      if "!option!" equ "--server-dir" set "SOLR_SERVER_DIR=%%a"
       if not "!option!" equ "-s" if not "!option!" equ "--solr-home" if not 
"!option!" equ "-d" if not "!option!" equ "--server-dir" (
         set "AUTH_PARAMS=!AUTH_PARAMS! !option! %%a"
       )
diff --git a/solr/core/src/java/org/apache/solr/cli/SolrCLI.java 
b/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
index 00a97b4434c..4714c43c99c 100755
--- a/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
+++ b/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
@@ -246,6 +246,7 @@ public class SolrCLI implements CLIO {
     else if ("post".equals(toolType)) return new PostTool();
     else if ("postlogs".equals(toolType)) return new PostLogsTool();
     else if ("version".equals(toolType)) return new VersionTool();
+    else if ("stream".equals(toolType)) return new StreamTool();
     else if ("snapshot-create".equals(toolType)) return new 
SnapshotCreateTool();
     else if ("snapshot-delete".equals(toolType)) return new 
SnapshotDeleteTool();
     else if ("snapshot-list".equals(toolType)) return new SnapshotListTool();
@@ -511,8 +512,7 @@ public class SolrCLI implements CLIO {
     print("Usage: solr COMMAND OPTIONS");
     print("       where COMMAND is one of: start, stop, restart, status, ");
     print(
-        "                                healthcheck, create, delete, auth, 
assert, config, export, api, package, post, ");
-
+        "                                healthcheck, create, delete, auth, 
assert, config, export, api, package, post, stream,");
     print(
         "                                zk ls, zk cp, zk rm , zk mv, zk 
mkroot, zk upconfig, zk downconfig,");
     print(
diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java 
b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
new file mode 100644
index 00000000000..9c0392ec71b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cli;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.solr.client.solrj.io.Lang;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.PushBackStream;
+import org.apache.solr.client.solrj.io.stream.SolrStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.handler.CatStream;
+
+/** Supports stream command in the bin/solr script. */
+public class StreamTool extends ToolBase {
+
+  public StreamTool() {
+    this(CLIO.getOutStream());
+  }
+
+  public StreamTool(PrintStream stdout) {
+    super(stdout);
+  }
+
+  private final SolrClientCache solrClientCache = new SolrClientCache();
+
+  @Override
+  public String getName() {
+    return "stream";
+  }
+
+  @Override
+  public String getUsage() {
+    // Specify that the last argument is the streaming expression
+    return "bin/solr stream [--array-delimiter <CHARACTER>] [-c <NAME>] 
[--delimiter <CHARACTER>] [-e <ENVIRONMENT>] [-f\n"
+        + "       <FIELDS>] [-h] [--header] [-s <HOST>] [-u <credentials>] 
[-v] [-z <HOST>]  <streaming expression OR stream_file.expr>\n";
+  }
+
+  private static final Option EXECUTION_OPTION =
+      Option.builder("e")
+          .longOpt("execution")
+          .hasArg()
+          .argName("ENVIRONMENT")
+          .desc(
+              "Execution environment is either 'local' (i.e CLI process) or 
via a 'remote' Solr server. Default environment is 'remote'.")
+          .build();
+
+  private static final Option COLLECTION_OPTION =
+      Option.builder("c")
+          .longOpt("name")
+          .argName("NAME")
+          .hasArg()
+          .desc(
+              "Name of the specific collection to execute expression on if the 
execution is set to 'remote'. Required for 'remote' execution environment.")
+          .build();
+
+  private static final Option FIELDS_OPTION =
+      Option.builder("f")
+          .longOpt("fields")
+          .argName("FIELDS")
+          .hasArg()
+          .desc(
+              "The fields in the tuples to output. Defaults to fields in the 
first tuple of result set.")
+          .build();
+
+  private static final Option HEADER_OPTION =
+      Option.builder().longOpt("header").desc("Specify to include a header 
line.").build();
+
+  private static final Option DELIMITER_OPTION =
+      Option.builder()
+          .longOpt("delimiter")
+          .argName("CHARACTER")
+          .hasArg()
+          .desc("The output delimiter. Default to using three spaces.")
+          .build();
+  private static final Option ARRAY_DELIMITER_OPTION =
+      Option.builder()
+          .longOpt("array-delimiter")
+          .argName("CHARACTER")
+          .hasArg()
+          .desc("The delimiter multi-valued fields. Default to using a pipe 
(|) delimiter.")
+          .build();
+
+  @Override
+  public Options getOptions() {
+
+    return super.getOptions()
+        .addOption(EXECUTION_OPTION)
+        .addOption(COLLECTION_OPTION)
+        .addOption(FIELDS_OPTION)
+        .addOption(HEADER_OPTION)
+        .addOption(DELIMITER_OPTION)
+        .addOption(ARRAY_DELIMITER_OPTION)
+        .addOption(CommonCLIOptions.CREDENTIALS_OPTION)
+        .addOptionGroup(getConnectionOptions());
+  }
+
+  @Override
+  @SuppressWarnings({"rawtypes"})
+  public void runImpl(CommandLine cli) throws Exception {
+
+    String expressionArgument = cli.getArgs()[0];
+    String execution = cli.getOptionValue(EXECUTION_OPTION, "remote");
+    String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION, "|");
+    String delimiter = cli.getOptionValue(DELIMITER_OPTION, "   ");
+    boolean includeHeaders = cli.hasOption(HEADER_OPTION);
+    String[] outputHeaders = getOutputFields(cli);
+
+    LineNumberReader bufferedReader = null;
+    String expr;
+    try {
+      Reader inputStream =
+          expressionArgument.toLowerCase(Locale.ROOT).endsWith(".expr")
+              ? new InputStreamReader(
+                  new FileInputStream(expressionArgument), 
Charset.defaultCharset())
+              : new StringReader(expressionArgument);
+
+      bufferedReader = new LineNumberReader(inputStream);
+      expr = StreamTool.readExpression(bufferedReader, cli.getArgs());
+      echoIfVerbose("Running Expression: " + expr);
+    } finally {
+      if (bufferedReader != null) {
+        bufferedReader.close();
+      }
+    }
+
+    PushBackStream pushBackStream;
+    if (execution.equalsIgnoreCase("local")) {
+      pushBackStream = doLocalMode(cli, expr);
+    } else {
+      pushBackStream = doRemoteMode(cli, expr);
+    }
+
+    try {
+      pushBackStream.open();
+
+      if (outputHeaders == null) {
+
+        Tuple tuple = pushBackStream.read();
+
+        if (!tuple.EOF) {
+          outputHeaders = getHeadersFromFirstTuple(tuple);
+        }
+
+        pushBackStream.pushBack(tuple);
+      }
+
+      if (includeHeaders) {
+        StringBuilder headersOut = new StringBuilder();
+        if (outputHeaders != null) {
+          for (int i = 0; i < outputHeaders.length; i++) {
+            if (i > 0) {
+              headersOut.append(delimiter);
+            }
+            headersOut.append(outputHeaders[i]);
+          }
+        }
+        CLIO.out(headersOut.toString());
+      }
+
+      while (true) {
+        Tuple tuple = pushBackStream.read();
+        if (tuple.EOF) {
+          break;
+        } else {
+          StringBuilder outLine = new StringBuilder();
+          if (outputHeaders != null) {
+            for (int i = 0; i < outputHeaders.length; i++) {
+              if (i > 0) {
+                outLine.append(delimiter);
+              }
+
+              Object o = tuple.get(outputHeaders[i]);
+              if (o != null) {
+                if (o instanceof List) {
+                  List outfields = (List) o;
+                  outLine.append(listToString(outfields, arrayDelimiter));
+                } else {
+                  outLine.append(o);
+                }
+              }
+            }
+          }
+          CLIO.out(outLine.toString());
+        }
+      }
+    } finally {
+
+      if (pushBackStream != null) {
+        pushBackStream.close();
+      }
+
+      solrClientCache.close();
+    }
+
+    echoIfVerbose("StreamTool -- Done.");
+  }
+
+  /**
+   * Runs a streaming expression in the local process of the CLI.
+   *
+   * <p>Running locally means that parallelization support or those 
expressions requiring access to
+   * internal Solr capabilities will not function.
+   *
+   * @param cli The CLI invoking the call
+   * @param expr The streaming expression to be parsed and in the context of 
the CLI process
+   * @return A connection to the streaming expression that receives Tuples as 
they are emitted
+   *     locally.
+   */
+  private PushBackStream doLocalMode(CommandLine cli, String expr) throws 
Exception {
+    String zkHost = SolrCLI.getZkHost(cli);
+
+    echoIfVerbose("Connecting to ZooKeeper at " + zkHost);
+    solrClientCache.getCloudSolrClient(zkHost);
+    solrClientCache.setBasicAuthCredentials(
+        cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION));
+
+    TupleStream stream;
+    PushBackStream pushBackStream;
+
+    StreamExpression streamExpression = StreamExpressionParser.parse(expr);
+    StreamFactory streamFactory = new StreamFactory();
+
+    // stdin is ONLY available in the local mode, not in the remote mode as it
+    // requires access to System.in
+    streamFactory.withFunctionName("stdin", StandardInStream.class);
+
+    // LocalCatStream extends CatStream and disables the Solr cluster specific
+    // logic about where to read data from.
+    streamFactory.withFunctionName("cat", LocalCatStream.class);
+
+    streamFactory.withDefaultZkHost(zkHost);
+
+    Lang.register(streamFactory);
+
+    stream = StreamTool.constructStream(streamFactory, streamExpression);
+
+    pushBackStream = new PushBackStream(stream);
+
+    // Now we can run the stream and return the results.
+    StreamContext streamContext = new StreamContext();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    // Output the headers
+    pushBackStream.setStreamContext(streamContext);
+
+    return pushBackStream;
+  }
+
+  /**
+   * Runs a streaming expression on a Solr collection via the /stream end 
point and returns the
+   * results to the CLI. Requires a collection to be specified to send the 
expression to.
+   *
+   * <p>Running remotely allows you to use all the standard Streaming 
Expression capabilities as the
+   * expression is running in a Solr environment.
+   *
+   * @param cli The CLI invoking the call
+   * @param expr The streaming expression to be parsed and run remotely
+   * @return A connection to the streaming expression that receives Tuples as 
they are emitted from
+   *     Solr /stream.
+   */
+  private PushBackStream doRemoteMode(CommandLine cli, String expr) throws 
Exception {
+
+    String solrUrl = SolrCLI.normalizeSolrUrl(cli);
+    if (!cli.hasOption("name")) {
+      throw new IllegalStateException(
+          "You must provide --name COLLECTION with --worker solr parameter.");
+    }
+    String collection = cli.getOptionValue("name");
+
+    if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) {
+      throw new IllegalStateException(
+          "The stdin() expression is only usable with --worker local set up.");
+    }
+
+    final SolrStream solrStream =
+        new SolrStream(solrUrl + "/solr/" + collection, params("qt", 
"/stream", "expr", expr));
+
+    String credentials = 
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
+    if (credentials != null) {
+      String username = credentials.split(":")[0];
+      String password = credentials.split(":")[1];
+      solrStream.setCredentials(username, password);
+    }
+    return new PushBackStream(solrStream);
+  }
+
+  private static ModifiableSolrParams params(String... params) {
+    if (params.length % 2 != 0) throw new RuntimeException("Params length 
should be even");
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i = 0; i < params.length; i += 2) {
+      msp.add(params[i], params[i + 1]);
+    }
+    return msp;
+  }
+
+  public static class StandardInStream extends TupleStream implements 
Expressible {
+
+    private BufferedReader reader;
+    private InputStream inputStream = System.in;
+    private boolean doClose = false;
+
+    public StandardInStream() {}
+
+    public StandardInStream(StreamExpression expression, StreamFactory factory)
+        throws IOException {}
+
+    @Override
+    public List<TupleStream> children() {
+      return null;
+    }
+
+    public void setInputStream(InputStream inputStream) {
+      this.inputStream = inputStream;
+      this.doClose = true;
+    }
+
+    @Override
+    public void open() {
+      reader = new BufferedReader(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (doClose) {
+        inputStream.close();
+      }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Override
+    public Tuple read() throws IOException {
+      String line = reader.readLine();
+      HashMap map = new HashMap();
+      Tuple tuple = new Tuple(map);
+      if (line != null) {
+        tuple.put("line", line);
+        tuple.put("file", "cat");
+      } else {
+        tuple.put("EOF", "true");
+      }
+      return tuple;
+    }
+
+    @Override
+    public void setStreamContext(StreamContext context) {}
+
+    @Override
+    public StreamExpression toExpression(StreamFactory factory) {
+      return null;
+    }
+
+    @Override
+    public Explanation toExplanation(StreamFactory factory) {
+      return null;
+    }
+
+    @Override
+    public StreamComparator getStreamSort() {
+      return null;
+    }
+  }
+
+  static String[] getOutputFields(CommandLine cli) {
+    if (cli.hasOption(FIELDS_OPTION)) {
+
+      String fl = cli.getOptionValue(FIELDS_OPTION);
+      String[] flArray = fl.split(",");
+      String[] outputHeaders = new String[flArray.length];
+
+      for (int i = 0; i < outputHeaders.length; i++) {
+        outputHeaders[i] = flArray[i].trim();
+      }
+
+      return outputHeaders;
+
+    } else {
+      return null;
+    }
+  }
+
+  public static class LocalCatStream extends CatStream {
+
+    public LocalCatStream(StreamExpression expression, StreamFactory factory) 
throws IOException {
+      super(expression, factory);
+    }
+
+    public LocalCatStream(String commaDelimitedFilepaths, int maxLines) {
+      super(commaDelimitedFilepaths, maxLines);
+    }
+
+    @Override
+    public void setStreamContext(StreamContext context) {
+      // LocalCatStream has no Solr core to pull from the context
+    }
+
+    @Override
+    protected List<CrawlFile> validateAndSetFilepathsInSandbox(String 
commaDelimitedFilepaths) {
+      final List<CrawlFile> crawlSeeds = new ArrayList<>();
+      for (String crawlRootStr : commaDelimitedFilepaths.split(",")) {
+        Path crawlRootPath = Paths.get(crawlRootStr).normalize();
+
+        if (!Files.exists(crawlRootPath)) {
+          throw new SolrException(
+              SolrException.ErrorCode.BAD_REQUEST,
+              "file/directory to stream doesn't exist: " + crawlRootStr);
+        }
+
+        crawlSeeds.add(new CrawlFile(crawlRootStr, crawlRootPath));
+      }
+
+      return crawlSeeds;
+    }
+  }
+
+  @SuppressWarnings({"rawtypes"})
+  static String[] getHeadersFromFirstTuple(Tuple tuple) {
+    Set fields = tuple.getFields().keySet();
+    String[] outputHeaders = new String[fields.size()];
+    int i = -1;
+    for (Object o : fields) {
+      outputHeaders[++i] = o.toString();
+    }
+    Arrays.sort(outputHeaders);
+    return outputHeaders;
+  }
+
+  @SuppressWarnings({"rawtypes"})
+  static String listToString(List values, String internalDelim) {
+    StringBuilder buf = new StringBuilder();
+    for (Object value : values) {
+      if (buf.length() > 0) {
+        buf.append(internalDelim);
+      }
+
+      buf.append(value.toString());
+    }
+
+    return buf.toString();
+  }
+
+  private static TupleStream constructStream(
+      StreamFactory streamFactory, StreamExpression streamExpression) throws 
IOException {
+    return streamFactory.constructStream(streamExpression);
+  }
+
+  static String readExpression(LineNumberReader bufferedReader, String[] args) 
throws IOException {
+
+    StringBuilder exprBuff = new StringBuilder();
+
+    boolean comment = false;
+    while (true) {
+      String line = bufferedReader.readLine();
+      if (line == null) {
+        break;
+      }
+
+      if (line.indexOf("/*") == 0) {
+        comment = true;
+        continue;
+      }
+
+      if (line.indexOf("*/") == 0) {
+        comment = false;
+        continue;
+      }
+
+      if (comment || line.startsWith("#") || line.startsWith("//")) {
+        continue;
+      }
+
+      // Substitute parameters
+
+      if (line.length() > 0) {
+        for (int i = 1; i < args.length; i++) {
+          String arg = args[i];
+          line = line.replace("$" + i, arg);
+        }
+      }
+
+      exprBuff.append(line);
+    }
+
+    return exprBuff.toString();
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/CatStream.java 
b/solr/core/src/java/org/apache/solr/handler/CatStream.java
index 70ee2b65242..f2515f9b38b 100644
--- a/solr/core/src/java/org/apache/solr/handler/CatStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/CatStream.java
@@ -113,7 +113,8 @@ public class CatStream extends TupleStream implements 
Expressible {
 
   @Override
   public void open() throws IOException {
-    final List<CrawlFile> initialCrawlSeeds = 
validateAndSetFilepathsInSandbox();
+    final List<CrawlFile> initialCrawlSeeds =
+        validateAndSetFilepathsInSandbox(this.commaDelimitedFilepaths);
 
     final List<CrawlFile> filesToCrawl = new ArrayList<>();
     for (CrawlFile crawlSeed : initialCrawlSeeds) {
@@ -163,7 +164,7 @@ public class CatStream extends TupleStream implements 
Expressible {
         .withExpression(toExpression(factory).toString());
   }
 
-  private List<CrawlFile> validateAndSetFilepathsInSandbox() {
+  protected List<CrawlFile> validateAndSetFilepathsInSandbox(String 
commaDelimitedFilepaths) {
     final List<CrawlFile> crawlSeeds = new ArrayList<>();
     for (String crawlRootStr : commaDelimitedFilepaths.split(",")) {
       Path crawlRootPath = chroot.resolve(crawlRootStr).normalize();
diff --git a/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java 
b/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java
new file mode 100644
index 00000000000..e91ab9e2d81
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cli;
+
+import static org.apache.solr.cli.SolrCLI.findTool;
+import static org.apache.solr.cli.SolrCLI.parseCmdLine;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.LineNumberReader;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.util.SecurityJson;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class StreamToolTest extends SolrCloudTestCase {
+
+  @BeforeClass
+  public static void setupClusterWithSecurityEnabled() throws Exception {
+    configureCluster(2).withSecurityJson(SecurityJson.SIMPLE).configure();
+  }
+
+  private <T extends SolrRequest<? extends SolrResponse>> T withBasicAuth(T 
req) {
+    req.setBasicAuthCredentials(SecurityJson.USER, SecurityJson.PASS);
+    return req;
+  }
+
+  @Test
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public void testGetHeaderFromFirstTuple() {
+    Tuple tuple = new Tuple(new HashMap());
+    tuple.put("field1", "blah");
+    tuple.put("field2", "blah");
+    tuple.put("field3", "blah");
+
+    String[] headers = StreamTool.getHeadersFromFirstTuple(tuple);
+
+    assertEquals(headers.length, 3);
+    assertEquals(headers[0], "field1");
+    assertEquals(headers[1], "field2");
+    assertEquals(headers[2], "field3");
+  }
+
+  @Test
+  public void testGetOutputFields() {
+    String[] args =
+        new String[] {
+          "--fields", "field9, field2, field3, field4",
+        };
+    StreamTool streamTool = new StreamTool();
+    CommandLine cli = SolrCLI.processCommandLineArgs(streamTool, args);
+    String[] outputFields = StreamTool.getOutputFields(cli);
+    assert outputFields != null;
+    assertEquals(outputFields.length, 4);
+    assertEquals(outputFields[0], "field9");
+    assertEquals(outputFields[1], "field2");
+    assertEquals(outputFields[2], "field3");
+    assertEquals(outputFields[3], "field4");
+  }
+
+  @Test
+  public void testReadExpression() throws Exception {
+    // This covers parameter substitution and expanded comments support.
+
+    String[] args = {"file.expr", "one", "two", "three"};
+    StringWriter stringWriter = new StringWriter();
+    PrintWriter buf = new PrintWriter(stringWriter);
+    buf.println("/*");
+    buf.println("Multi-line comment Comment...");
+    buf.println("*/");
+    buf.println("// Single line comment");
+    buf.println("# Single line comment");
+    buf.println("let(a=$1, b=$2,");
+    buf.println("search($3))");
+    buf.println(")");
+
+    String expr = stringWriter.toString();
+
+    LineNumberReader reader = new LineNumberReader(new StringReader(expr));
+    String finalExpression = StreamTool.readExpression(reader, args);
+    // Strip the comment and insert the params in order.
+    assertEquals(finalExpression, "let(a=one, b=two,search(three)))");
+  }
+
+  @Test
+  public void testReadExpression2() throws Exception {
+    // This covers parameter substitution and expanded comments support.
+
+    String[] args = {"file.expr", "id", "desc_s", "desc"};
+    StringWriter stringWriter = new StringWriter();
+    PrintWriter buf = new PrintWriter(stringWriter);
+
+    buf.println("# Try me");
+    buf.println("search(my_collection,q='*:*',fl='$1, $2',sort='id $3')");
+
+    String expr = stringWriter.toString();
+
+    LineNumberReader reader = new LineNumberReader(new StringReader(expr));
+    String finalExpression = StreamTool.readExpression(reader, args);
+    // Strip the comment and insert the params in order.
+    assertEquals(finalExpression, "search(my_collection,q='*:*',fl='id, 
desc_s',sort='id desc')");
+  }
+
+  @Test
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public void testReadStream() throws Exception {
+    StreamTool.StandardInStream inStream = new StreamTool.StandardInStream();
+    List<Tuple> tuples = new ArrayList();
+    try {
+      StringWriter stringWriter = new StringWriter();
+      PrintWriter buf = new PrintWriter(stringWriter);
+
+      buf.println("one  two");
+      buf.println("three  four");
+      buf.println("five  six");
+
+      String expr = stringWriter.toString();
+      ByteArrayInputStream inputStream =
+          new ByteArrayInputStream(expr.getBytes(Charset.defaultCharset()));
+      inStream.setInputStream(inputStream);
+      inStream.open();
+      while (true) {
+        Tuple tuple = inStream.read();
+        if (tuple.EOF) {
+          break;
+        } else {
+          tuples.add(tuple);
+        }
+      }
+
+    } finally {
+      inStream.close();
+    }
+
+    assertEquals(tuples.size(), 3);
+
+    String line1 = tuples.get(0).getString("line");
+    String line2 = tuples.get(1).getString("line");
+    String line3 = tuples.get(2).getString("line");
+
+    assertEquals("one  two", line1);
+    assertEquals("three  four", line2);
+    assertEquals("five  six", line3);
+  }
+
+  @Test
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public void testLocalCatStream() throws Exception {
+    File localFile = File.createTempFile("topLevel1", ".txt");
+    populateFileWithData(localFile.toPath());
+
+    StreamTool.LocalCatStream catStream =
+        new StreamTool.LocalCatStream(localFile.getAbsolutePath(), -1);
+    List<Tuple> tuples = new ArrayList();
+    try {
+      catStream.open();
+      while (true) {
+        Tuple tuple = catStream.read();
+        if (tuple.EOF) {
+          break;
+        } else {
+          tuples.add(tuple);
+        }
+      }
+
+    } finally {
+      catStream.close();
+    }
+
+    assertEquals(4, tuples.size());
+
+    for (int i = 0; i < 4; i++) {
+      Tuple t = tuples.get(i);
+      assertEquals(localFile.getName() + " line " + (i + 1), t.get("line"));
+      assertEquals(localFile.getAbsolutePath(), t.get("file"));
+    }
+  }
+
+  @Test
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public void testListToString() {
+    List stuff = new ArrayList();
+    stuff.add("test1");
+    stuff.add(3);
+    stuff.add(111.32322);
+    stuff.add("test3");
+    String s = StreamTool.listToString(stuff, "|");
+    assertEquals("test1|3|111.32322|test3", s);
+  }
+
+  @Test
+  public void testStdInFailsWithRemoteWorker() throws Exception {
+    String expression = "echo(stdin())";
+
+    String[] args =
+        new String[] {
+          "stream",
+          "-e",
+          "remote",
+          "--name",
+          "fakeCollection",
+          "--verbose",
+          "--zk-host",
+          cluster.getZkClient().getZkServerAddress(),
+          expression
+        };
+    assertEquals(1, runTool(args));
+  }
+
+  @Test
+  public void testStdInSucceedsWithLocalWorker() throws Exception {
+    String expression = "echo(stdin())";
+
+    String[] args =
+        new String[] {
+          "stream",
+          "-e",
+          "local",
+          "-v",
+          "-z",
+          cluster.getZkClient().getZkServerAddress(),
+          expression
+        };
+    assertEquals(0, runTool(args));
+  }
+
+  @Test
+  public void testRunEchoStreamLocally() throws Exception {
+
+    String expression = "echo(Hello)";
+    File expressionFile = File.createTempFile("expression", ".EXPR");
+    FileWriter writer = new FileWriter(expressionFile, 
Charset.defaultCharset());
+    writer.write(expression);
+    writer.close();
+
+    // test passing in the file
+    // notice that we do not pass in zkHost or solrUrl for a simple echo run 
locally.
+    String[] args = {
+      "stream",
+      "-e",
+      "local",
+      "--verbose",
+      "-zk-host",
+      cluster.getZkClient().getZkServerAddress(),
+      expressionFile.getAbsolutePath()
+    };
+
+    assertEquals(0, runTool(args));
+
+    // test passing in the expression directly
+    args =
+        new String[] {
+          "stream",
+          "--execution",
+          "local",
+          "--verbose",
+          "--zk-host",
+          cluster.getZkClient().getZkServerAddress(),
+          expression
+        };
+
+    assertEquals(0, runTool(args));
+  }
+
+  @Test
+  public void testRunEchoStreamRemotely() throws Exception {
+    String collectionName = "streamWorkerCollection";
+    withBasicAuth(CollectionAdminRequest.createCollection(collectionName, 
"_default", 1, 1))
+        .processAndWait(cluster.getSolrClient(), 10);
+    waitForState(
+        "Expected collection to be created with 1 shard and 1 replicas",
+        collectionName,
+        clusterShape(1, 1));
+
+    String expression = "echo(Hello)";
+    File expressionFile = File.createTempFile("expression", ".EXPR");
+    FileWriter writer = new FileWriter(expressionFile, 
Charset.defaultCharset());
+    writer.write(expression);
+    writer.close();
+
+    // test passing in the file
+    String[] args = {
+      "stream",
+      "-e",
+      "remote",
+      "-c",
+      collectionName,
+      "--verbose",
+      "-z",
+      cluster.getZkClient().getZkServerAddress(),
+      "--credentials",
+      SecurityJson.USER_PASS,
+      expressionFile.getAbsolutePath()
+    };
+
+    assertEquals(0, runTool(args));
+
+    // test passing in the expression directly
+    args =
+        new String[] {
+          "stream",
+          "--execution",
+          "remote",
+          "--name",
+          collectionName,
+          "--verbose",
+          "--zk-host",
+          cluster.getZkClient().getZkServerAddress(),
+          "--credentials",
+          SecurityJson.USER_PASS,
+          expression
+        };
+
+    assertEquals(0, runTool(args));
+  }
+
+  private int runTool(String[] args) throws Exception {
+    Tool tool = findTool(args);
+    assertTrue(tool instanceof StreamTool);
+    CommandLine cli = parseCmdLine(tool, args);
+    return tool.runTool(cli);
+  }
+
+  // Copied from StreamExpressionTest.java
+  private static void populateFileWithData(Path dataFile) throws Exception {
+    // Files.createFile(dataFile);
+    try (final BufferedWriter writer = Files.newBufferedWriter(dataFile, 
StandardCharsets.UTF_8)) {
+      for (int i = 1; i <= 4; i++) {
+        writer.write(dataFile.getFileName() + " line " + i);
+        writer.newLine();
+      }
+    }
+  }
+}
diff --git a/solr/packaging/test/test_stream.bats 
b/solr/packaging/test/test_stream.bats
new file mode 100644
index 00000000000..63145522c79
--- /dev/null
+++ b/solr/packaging/test/test_stream.bats
@@ -0,0 +1,86 @@
+#!/usr/bin/env bats
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load bats_helper
+
+setup_file() {
+  common_clean_setup
+  solr start -e techproducts
+  solr auth enable --type basicAuth --credentials name:password
+}
+
+teardown_file() {
+  common_setup
+  solr stop --all
+}
+
+setup() {
+  common_setup
+}
+
+teardown() {
+  # save a snapshot of SOLR_HOME for failed tests
+  save_home_on_failure
+}
+
+@test "searching solr via locally executed streaming expression" {
+  
+  local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr"
+  echo 'search(techproducts,' > "${solr_stream_file}"
+  echo 'q="name:memory",' >> "${solr_stream_file}"
+  echo 'fl="name,price",' >> "${solr_stream_file}"
+  echo 'sort="price desc"' >> "${solr_stream_file}"
+  echo ')' >> "${solr_stream_file}"  
+  
+  run solr stream --execution local --header --credentials name:password 
${solr_stream_file}
+
+  assert_output --partial 'name   price'
+  assert_output --partial 'CORSAIR  XMS'
+  refute_output --partial 'ERROR'
+}
+
+@test "searching solr via remotely executed streaming expression" {
+  
+  local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr"
+  echo 'search(techproducts,' > "${solr_stream_file}"
+  echo 'q="name:memory",' >> "${solr_stream_file}"
+  echo 'fl="name,price",' >> "${solr_stream_file}"
+  echo 'sort="price desc"' >> "${solr_stream_file}"
+  echo ')' >> "${solr_stream_file}"
+  
+  run solr stream -e remote --name techproducts --solr-url 
http://localhost:${SOLR_PORT} --header --credentials name:password 
${solr_stream_file}
+
+  assert_output --partial 'name   price'
+  assert_output --partial 'CORSAIR  XMS'
+  refute_output --partial 'ERROR'
+}
+
+@test "variable interpolation" {
+  
+  local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr"
+  echo 'search(techproducts,' > "${solr_stream_file}"
+  echo 'q="name:$1",' >> "${solr_stream_file}"
+  echo 'fl="name,price",' >> "${solr_stream_file}"
+  echo 'sort="price $2"' >> "${solr_stream_file}"
+  echo ')' >> "${solr_stream_file}"
+  
+  run solr stream --execution local --header --credentials name:password 
${solr_stream_file} apple asc
+
+  assert_output --partial 'name   price'
+  assert_output --partial 'Apple 60 GB iPod'
+  refute_output --partial 'ERROR'
+}
diff --git a/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc 
b/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc
new file mode 100644
index 00000000000..20fe2458e42
--- /dev/null
+++ b/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc
@@ -0,0 +1,176 @@
+= Stream Tool
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+The Stream tool allows you to run a xref:streaming-expressions.adoc[] in Solr 
and see the results from the command line.
+It is very similar to the xref:stream-screen.adoc[], but is part of the 
`bin/solr` CLI.
+Being a CLI, you can pipe content into it similar to other Unix style tools, 
as well as run actually RUN many kinds of expressions locally as well. 
+
+NOTE: The Stream Tool is classified as "experimental".
+It may change in backwards-incompatible ways as it evolves to cover additional 
functionality.
+
+To run it, open a terminal and enter:
+
+[,console]
+----
+$ bin/solr stream --header -c techproducts --delimiter=\| 
'search(techproducts,q="name:memory",fl="name,price")'
+----
+
+This will run the provided streaming expression on the `techproducts` 
collection on your local Solr and produce:  
+
+[,console]
+----
+name|price
+CORSAIR  XMS 2GB (2 x 1GB) 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) Dual 
Channel Kit System Memory - Retail|185.0
+CORSAIR ValueSelect 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System 
Memory - Retail|74.99
+A-DATA V-Series 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System 
Memory - OEM|
+----
+
+TIP: Notice how we used the pipe character (|) as the delimiter?  It required 
a backslash for escaping it so it wouldn't be treated as a pipe within the 
shell script.
+
+You can also specify a file with the suffix `.expr` containing your streaming 
expression.
+This is useful for longer expressions or if you are experiencing shell 
character-escaping issues with your expression.
+
+Assuming you have create the file `stream.expr` with the contents:
+
+----
+# Stream a search
+
+search(
+  techproducts,
+  q="name:memory",
+  fl="name,price",
+  sort="price desc"
+)
+----
+
+Then you can run it on the Solr collection `techproducts`, specifying you want 
a header row:
+
+[,console]
+----
+$ bin/solr stream --header -c techproducts stream.expr
+----
+
+And this will produce:
+
+[,console]
+----
+name   price
+CORSAIR  XMS 2GB (2 x 1GB) 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) Dual 
Channel Kit System Memory - Retail   185.0
+CORSAIR ValueSelect 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System 
Memory - Retail   74.99
+A-DATA V-Series 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System 
Memory - OEM
+----
+
+== Using the bin/solr stream Tool
+
+To use the tool you need to provide the streaming expression either inline as 
the last argument, or provide a file ending in `.expr` that contains the 
expression.
+
+The `--help` (or simply `-h`) option will output information on its usage 
(i.e., `bin/solr stream --help)`:
+
+[source,plain]
+----
+usage: bin/solr stream [--array-delimiter <CHARACTER>] [-c <NAME>] 
[--delimiter <CHARACTER>] [-e <ENVIRONMENT>] [-f
+       <FIELDS>] [-h] [--header] [-s <HOST>] [-u <credentials>] [-v] [-z 
<HOST>]
+
+List of options:
+    --array-delimiter <CHARACTER>   The delimiter multi-valued fields. Default 
to using a pipe (|) delimiter.
+ -c,--name <NAME>                   Name of the specific collection to execute 
expression on if the execution is set
+                                    to 'remote'. Required for 'remote' 
execution environment.
+    --delimiter <CHARACTER>         The output delimiter. Default to using 
three spaces.
+ -e,--execution <ENVIRONMENT>       Execution environment is either 'local' 
(i.e CLI process) or via a 'remote' Solr
+                                    server. Default environment is 'remote'.
+ -f,--fields <FIELDS>               The fields in the tuples to output. 
Defaults to fields in the first tuple of result
+                                    set.
+ -h,--help                          Print this message.
+    --header                        Specify to include a header line.
+ -s,--solr-url <HOST>               Base Solr URL, which can be used to 
determine the zk-host if that's not known;
+                                    defaults to: http://localhost:8983.
+ -u,--credentials <credentials>     Credentials in the format 
username:password. Example: --credentials solr:SolrRocks
+ -v,--verbose                       Enable verbose command output.
+ -z,--zk-host <HOST>                Zookeeper connection string; unnecessary 
if ZK_HOST is defined in solr.in.sh;
+                                    otherwise, defaults to localhost:9983.
+----
+
+== Examples Using bin/solr stream
+
+There are several ways to use `bin/solr stream`.
+This section presents several examples.
+
+=== Executing Expression Locally
+
+Streaming Expressions by default are executed in the Solr cluster.  
+However there are use cases where you want to interact with data in your local 
environment, or even run a streaming expression independent of Solr.
+
+The Stream Tool allows you to specify `--execution local` to process the 
expression in the Solr CLI's JVM.
+
+However, "local" processing does not imply a networking sandbox.
+Many streaming expressions, such as `search` and `update`, will make network 
requests to remote Solr nodes if configured to do so, even in "local" execution 
mode.
+
+Assuming you have create the file `load_data.expr` with the contents:
+
+----
+# Index CSV File
+
+update(
+  gettingstarted,
+  parseCSV(
+    cat(./example/exampledocs/books.csv, maxLines=2)
+  )
+)
+----
+
+Running this expression will read in the local file and send the first two 
lines to the collection `gettingstarted`.
+
+TIP: Want to send data to a remote Solr?  pass in `--solr-url 
http://solr.remote:8983`.
+
+
+[,console]
+----
+$ bin/solr stream --execution local --header load_data.expr
+----
+
+
+The StreamTool adds some Streaming Expressions specifically for local use:
+
+* stdin() lets you pipe data directly into the streaming expression.
+* cat() that allows you to read ANY file on your local system.  This is 
different from the xref:stream-source-reference.adoc#cat[`cat`] operator that 
runs in Solr that only accesses `$SOLR_HOME/userfiles/`.
+
+Caveats:
+
+ * You don't get to use any of the parallelization support that is available 
when you run the expression on the cluster.
+ * Anything that requires Solr internals access won't work with the 
`--execution local` context.
+
+=== Piping data to an expression
+
+Index a CSV file into `gettingstarted` collection.
+
+[,console]
+----
+$ cat example/exampledocs/books.csv | bin/solr stream -e local 
'update(gettingstarted,parseCSV(stdin()))'
+----
+
+=== Variable interpolation
+
+You can do variable interpolation via having `$1`, `$2` etc in your streaming 
expression, and then passing those values as arguments.
+
+[,console]
+----
+$ bin/solr stream -c techproducts 'echo("$1")' "Hello World"
+Hello World
+----
+
+This also works when using `.expr` files.
diff --git 
a/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc 
b/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
index a9a6bf564aa..cc3e502a2bc 100644
--- a/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
+++ b/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
@@ -143,3 +143,7 @@ The xref:math-expressions.adoc[] has in depth coverage of 
visualization techniqu
 === Stream Screen
 
 * xref:stream-screen.adoc[]: Submit streaming expressions and see results and 
parsing explanations.
+
+=== Stream Tool
+
+* xref:stream-tool.adoc[]: Submit streaming expressions and see results via 
`bin/solr stream`.
diff --git a/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc 
b/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
index aa3f0fbade7..973bd80f005 100644
--- a/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
+++ b/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
@@ -96,3 +96,4 @@
 ** xref:graph-traversal.adoc[]
 ** xref:stream-api.adoc[]
 ** xref:stream-screen.adoc[]
+** xref:stream-tool.adoc[]
diff --git 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 45ce93c30c4..7550d1a35c4 100644
--- 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -17,7 +17,6 @@
 package org.apache.solr.client.solrj.io;
 
 import java.io.Closeable;
-import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,14 +37,10 @@ import org.apache.solr.client.solrj.impl.SolrClientBuilder;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.URLUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** The SolrClientCache caches SolrClients, so they can be reused by different 
TupleStreams. */
 public class SolrClientCache implements Closeable {
 
-  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   // Set the floor for timeouts to 60 seconds.
   // Timeouts can be increased by setting the system properties defined below.
   private static final int MIN_TIMEOUT = 60000;
@@ -55,6 +50,8 @@ public class SolrClientCache implements Closeable {
   private static final int minSocketTimeout =
       Math.max(Integer.getInteger(HttpClientUtil.PROP_SO_TIMEOUT, 
MIN_TIMEOUT), MIN_TIMEOUT);
 
+  private String basicAuthCredentials = null; // Only support with the 
http2SolrClient
+
   private final Map<String, SolrClient> solrClients = new HashMap<>();
   private final HttpClient apacheHttpClient;
   private final Http2SolrClient http2SolrClient;
@@ -77,6 +74,10 @@ public class SolrClientCache implements Closeable {
     this.http2SolrClient = http2SolrClient;
   }
 
+  public void setBasicAuthCredentials(String basicAuthCredentials) {
+    this.basicAuthCredentials = basicAuthCredentials;
+  }
+
   public void setDefaultZKHost(String zkHost) {
     if (zkHost != null) {
       zkHost = zkHost.split("/")[0];
@@ -101,11 +102,12 @@ public class SolrClientCache implements Closeable {
     String zkHostNoChroot = zkHost.split("/")[0];
     boolean canUseACLs =
         
Optional.ofNullable(defaultZkHost.get()).map(zkHostNoChroot::equals).orElse(false);
+
     final CloudSolrClient client;
     if (apacheHttpClient != null) {
       client = newCloudLegacySolrClient(zkHost, apacheHttpClient, canUseACLs);
     } else {
-      client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs);
+      client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs, 
basicAuthCredentials);
     }
     solrClients.put(zkHost, client);
     return client;
@@ -129,12 +131,17 @@ public class SolrClientCache implements Closeable {
   }
 
   private static CloudHttp2SolrClient newCloudHttp2SolrClient(
-      String zkHost, Http2SolrClient http2SolrClient, boolean canUseACLs) {
+      String zkHost,
+      Http2SolrClient http2SolrClient,
+      boolean canUseACLs,
+      String basicAuthCredentials) {
     final List<String> hosts = List.of(zkHost);
     var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty());
     builder.canUseZkACLs(canUseACLs);
     // using internal builder to ensure the internal client gets closed
-    builder = 
builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null, 
http2SolrClient));
+    builder =
+        builder.withInternalClientBuilder(
+            newHttp2SolrClientBuilder(null, http2SolrClient, 
basicAuthCredentials));
     var client = builder.build();
     try {
       client.connect();
@@ -163,7 +170,7 @@ public class SolrClientCache implements Closeable {
     if (apacheHttpClient != null) {
       client = newHttpSolrClient(baseUrl, apacheHttpClient);
     } else {
-      client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient).build();
+      client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient, 
basicAuthCredentials).build();
     }
     solrClients.put(baseUrl, client);
     return client;
@@ -190,7 +197,7 @@ public class SolrClientCache implements Closeable {
   }
 
   private static Http2SolrClient.Builder newHttp2SolrClientBuilder(
-      String url, Http2SolrClient http2SolrClient) {
+      String url, Http2SolrClient http2SolrClient, String 
basicAuthCredentials) {
     final var builder =
         (url == null || URLUtil.isBaseUrl(url)) // URL may be null here and 
set by caller
             ? new Http2SolrClient.Builder(url)
@@ -199,6 +206,8 @@ public class SolrClientCache implements Closeable {
     if (http2SolrClient != null) {
       builder.withHttpClient(http2SolrClient);
     }
+    builder.withOptionalBasicAuthCredentials(basicAuthCredentials);
+
     long idleTimeout = minSocketTimeout;
     if (builder.getIdleTimeoutMillis() != null) {
       idleTimeout = Math.max(idleTimeout, builder.getIdleTimeoutMillis());
diff --git 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
index 9576cf9658e..fc26a8972f7 100644
--- 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
+++ 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
@@ -224,4 +224,9 @@ public class LetStream extends TupleStream implements 
Expressible {
   public int getCost() {
     return 0;
   }
+
+  @SuppressWarnings({"rawtypes"})
+  public Map getLetParams() {
+    return this.letParams;
+  }
 }

Reply via email to