Added: 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java
URL: 
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java?rev=1909097&view=auto
==============================================================================
--- 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java
 (added)
+++ 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java
 Wed Apr 12 14:35:38 2023
@@ -0,0 +1,691 @@
+package org.apache.manifoldcf.agents.output.solr;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.slf4j.MDC;
+
+public abstract class ModifiedLBSolrClient extends SolrClient {
+
+  // defaults
+  protected static final Set<Integer> RETRY_CODES = new 
HashSet<>(Arrays.asList(404, 403, 503, 500));
+  private static final int CHECK_INTERVAL = 60 * 1000; // 1 minute between 
checks
+  private static final int NONSTANDARD_PING_LIMIT = 5; // number of times 
we'll ping dead servers not in the server list
+
+  // keys to the maps are currently of the form "http://localhost:8983/solr";
+  // which should be equivalent to HttpSolrServer.getBaseURL()
+  private final Map<String, ServerWrapper> aliveServers = new 
LinkedHashMap<>();
+  // access to aliveServers should be synchronized on itself
+
+  protected final Map<String, ServerWrapper> zombieServers = new 
ConcurrentHashMap<>();
+
+  // changes to aliveServers are reflected in this array, no need to 
synchronize
+  private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
+
+  private volatile ScheduledExecutorService aliveCheckExecutor;
+
+  private int interval = CHECK_INTERVAL;
+  private final AtomicInteger counter = new AtomicInteger(-1);
+
+  private static final SolrQuery solrQuery = new SolrQuery("*:*");
+  protected volatile ResponseParser parser;
+  protected volatile RequestWriter requestWriter;
+
+  protected Set<String> queryParams = new HashSet<>();
+
+  static {
+    solrQuery.setRows(0);
+    /**
+     * Default sort (if we don't supply a sort) is by score and since we 
request 0 rows any sorting and scoring is not necessary. SolrQuery.DOCID 
schema-independently specifies a non-scoring sort.
+     * <code>_docid_ asc</code> sort is efficient, <code>_docid_ desc</code> 
sort is not, so choose ascending DOCID sort.
+     */
+    solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc);
+    // not a top-level request, we are interested only in the server being 
sent to i.e. it need not
+    // distribute our request to further servers
+    solrQuery.setDistrib(false);
+  }
+
+  protected static class ServerWrapper {
+    final String baseUrl;
+
+    // "standard" servers are used by default. They normally live in the alive 
list
+    // and move to the zombie list when unavailable. When they become 
available again,
+    // they move back to the alive list.
+    boolean standard = true;
+
+    int failedPings = 0;
+
+    ServerWrapper(final String baseUrl) {
+      this.baseUrl = baseUrl;
+    }
+
+    public String getBaseUrl() {
+      return baseUrl;
+    }
+
+    @Override
+    public String toString() {
+      return baseUrl;
+    }
+
+    @Override
+    public int hashCode() {
+      return baseUrl.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (this == obj)
+        return true;
+      if (!(obj instanceof ServerWrapper))
+        return false;
+      return baseUrl.equals(((ServerWrapper) obj).baseUrl);
+    }
+  }
+
+  protected static class ServerIterator {
+    String serverStr;
+    List<String> skipped;
+    int numServersTried;
+    Iterator<String> it;
+    Iterator<String> skippedIt;
+    String exceptionMessage;
+    long timeAllowedNano;
+    long timeOutTime;
+
+    final Map<String, ServerWrapper> zombieServers;
+    final Req req;
+
+    public ServerIterator(final Req req, final Map<String, ServerWrapper> 
zombieServers) {
+      this.it = req.getServers().iterator();
+      this.req = req;
+      this.zombieServers = zombieServers;
+      this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
+      this.timeOutTime = System.nanoTime() + timeAllowedNano;
+      fetchNext();
+    }
+
+    public synchronized boolean hasNext() {
+      return serverStr != null;
+    }
+
+    private void fetchNext() {
+      serverStr = null;
+      if (req.numServersToTry != null && numServersTried > 
req.numServersToTry) {
+        exceptionMessage = "Time allowed to handle this request exceeded";
+        return;
+      }
+
+      while (it.hasNext()) {
+        serverStr = it.next();
+        serverStr = normalize(serverStr);
+        // if the server is currently a zombie, just skip to the next one
+        final ServerWrapper wrapper = zombieServers.get(serverStr);
+        if (wrapper != null) {
+          final int numDeadServersToTry = req.getNumDeadServersToTry();
+          if (numDeadServersToTry > 0) {
+            if (skipped == null) {
+              skipped = new ArrayList<>(numDeadServersToTry);
+              skipped.add(wrapper.getBaseUrl());
+            } else if (skipped.size() < numDeadServersToTry) {
+              skipped.add(wrapper.getBaseUrl());
+            }
+          }
+          continue;
+        }
+
+        break;
+      }
+      if (serverStr == null && skipped != null) {
+        if (skippedIt == null) {
+          skippedIt = skipped.iterator();
+        }
+        if (skippedIt.hasNext()) {
+          serverStr = skippedIt.next();
+        }
+      }
+    }
+
+    boolean isServingZombieServer() {
+      return skippedIt != null;
+    }
+
+    public synchronized String nextOrError() throws SolrServerException {
+      return nextOrError(null);
+    }
+
+    public synchronized String nextOrError(final Exception previousEx) throws 
SolrServerException {
+      String suffix = "";
+      if (previousEx == null) {
+        suffix = ":" + zombieServers.keySet();
+      }
+      // Skipping check time exceeded for the first request
+      if (numServersTried > 0 && isTimeExceeded(timeAllowedNano, timeOutTime)) 
{
+        throw new SolrServerException("Time allowed to handle this request 
exceeded" + suffix, previousEx);
+      }
+      if (serverStr == null) {
+        throw new SolrServerException("No live SolrServers available to handle 
this request" + suffix, previousEx);
+      }
+      numServersTried++;
+      if (req.getNumServersToTry() != null && numServersTried > 
req.getNumServersToTry()) {
+        throw new SolrServerException("No live SolrServers available to handle 
this request:" + " numServersTried=" + numServersTried + " numServersToTry=" + 
req.getNumServersToTry() + suffix,
+            previousEx);
+      }
+      final String rs = serverStr;
+      fetchNext();
+      return rs;
+    }
+  }
+
+  // Req should be parameterized too, but that touches a whole lotta code
+  public static class Req {
+    protected SolrRequest<?> request;
+    protected List<String> servers;
+    protected int numDeadServersToTry;
+    private final Integer numServersToTry;
+
+    public Req(final SolrRequest<?> request, final List<String> servers) {
+      this(request, servers, null);
+    }
+
+    public Req(final SolrRequest<?> request, final List<String> servers, final 
Integer numServersToTry) {
+      this.request = request;
+      this.servers = servers;
+      this.numDeadServersToTry = servers.size();
+      this.numServersToTry = numServersToTry;
+    }
+
+    public SolrRequest<?> getRequest() {
+      return request;
+    }
+
+    public List<String> getServers() {
+      return servers;
+    }
+
+    /**
+     * @return the number of dead servers to try if there are no live servers 
left
+     */
+    public int getNumDeadServersToTry() {
+      return numDeadServersToTry;
+    }
+
+    /**
+     * @param numDeadServersToTry The number of dead servers to try if there 
are no live servers left. Defaults to the number of servers in this request.
+     */
+    public void setNumDeadServersToTry(final int numDeadServersToTry) {
+      this.numDeadServersToTry = numDeadServersToTry;
+    }
+
+    public Integer getNumServersToTry() {
+      return numServersToTry;
+    }
+  }
+
+  public static class Rsp {
+    protected String server;
+    protected NamedList<Object> rsp;
+
+    /** The response from the server */
+    public NamedList<Object> getResponse() {
+      return rsp;
+    }
+
+    /** The server that returned the response */
+    public String getServer() {
+      return server;
+    }
+  }
+
+  public ModifiedLBSolrClient(final List<String> baseSolrUrls) {
+    if (!baseSolrUrls.isEmpty()) {
+      for (final String s : baseSolrUrls) {
+        final ServerWrapper wrapper = createServerWrapper(s);
+        aliveServers.put(wrapper.getBaseUrl(), wrapper);
+      }
+      updateAliveList();
+    }
+  }
+
+  protected void updateAliveList() {
+    synchronized (aliveServers) {
+      aliveServerList = aliveServers.values().toArray(new ServerWrapper[0]);
+    }
+  }
+
+  protected ServerWrapper createServerWrapper(final String baseUrl) {
+    return new ServerWrapper(baseUrl);
+  }
+
+  public Set<String> getQueryParams() {
+    return queryParams;
+  }
+
+  /**
+   * Expert Method.
+   *
+   * @param queryParams set of param keys to only send via the query string
+   */
+  public void setQueryParams(final Set<String> queryParams) {
+    this.queryParams = queryParams;
+  }
+
+  public void addQueryParams(final String queryOnlyParam) {
+    this.queryParams.add(queryOnlyParam);
+  }
+
+  public static String normalize(String server) {
+    if (server.endsWith("/"))
+      server = server.substring(0, server.length() - 1);
+    return server;
+  }
+
+  /**
+   * Tries to query a live server from the list provided in Req. Servers in 
the dead pool are skipped. If a request fails due to an IOException, the server 
is moved to the dead pool for a certain
+   * period of time, or until a test request on that server succeeds.
+   *
+   * <p>
+   * Servers are queried in the exact order given (except servers currently in 
the dead pool are skipped). If no live servers from the provided list remain to 
be tried, a number of previously skipped
+   * dead servers will be tried. Req.getNumDeadServersToTry() controls how 
many dead servers will be tried.
+   *
+   * <p>
+   * If no live servers are found a SolrServerException is thrown.
+   *
+   * @param req contains both the request as well as the list of servers to 
query
+   * @return the result of the request
+   * @throws IOException If there is a low-level I/O error.
+   */
+  public Rsp request(final Req req) throws SolrServerException, IOException {
+    final Rsp rsp = new Rsp();
+    Exception ex = null;
+    final boolean isNonRetryable = req.request instanceof IsUpdateRequest || 
ADMIN_PATHS.contains(req.request.getPath());
+    final ServerIterator serverIterator = new ServerIterator(req, 
zombieServers);
+    String serverStr;
+    while ((serverStr = serverIterator.nextOrError(ex)) != null) {
+      try {
+        MDC.put("ModifiedLBSolrClient.url", serverStr);
+        ex = doRequest(serverStr, req, rsp, isNonRetryable, 
serverIterator.isServingZombieServer());
+        if (ex == null) {
+          return rsp; // SUCCESS
+        }
+      } finally {
+        MDC.remove("ModifiedLBSolrClient.url");
+      }
+    }
+    throw new SolrServerException("No live SolrServers available to handle 
this request:" + zombieServers.keySet(), ex);
+  }
+
+  /**
+   * @return time allowed in nanos, returns -1 if no time_allowed is specified.
+   */
+  private static long getTimeAllowedInNanos(final SolrRequest<?> req) {
+    final SolrParams reqParams = req.getParams();
+    return reqParams == null ? -1 : 
TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), 
TimeUnit.MILLISECONDS);
+  }
+
+  private static boolean isTimeExceeded(final long timeAllowedNano, final long 
timeOutTime) {
+    return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
+  }
+
+  protected Exception doRequest(final String baseUrl, final Req req, final Rsp 
rsp, final boolean isNonRetryable, final boolean isZombie) throws 
SolrServerException, IOException {
+    Exception ex = null;
+    try {
+      rsp.server = baseUrl;
+      req.getRequest().setBasePath(baseUrl);
+      rsp.rsp = getClient(baseUrl).request(req.getRequest(), (String) null);
+      if (isZombie) {
+        zombieServers.remove(baseUrl);
+      }
+    } catch (final BaseHttpSolrClient.RemoteExecutionException e) {
+      throw e;
+    } catch (final SolrException e) {
+      // we retry on 404 or 403 or 503 or 500
+      // unless it's an update - then we only retry on connect exception
+      if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
+        ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+      } else {
+        // Server is alive but the request was likely malformed or invalid
+        if (isZombie) {
+          zombieServers.remove(baseUrl);
+        }
+        throw e;
+      }
+    } catch (final SocketException e) {
+      if (!isNonRetryable || e instanceof ConnectException) {
+        ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+      } else {
+        throw e;
+      }
+    } catch (final SocketTimeoutException e) {
+      if (!isNonRetryable) {
+        ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+      } else {
+        throw e;
+      }
+    } catch (final SolrServerException e) {
+      final Throwable rootCause = e.getRootCause();
+      if (!isNonRetryable && rootCause instanceof IOException) {
+        ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+      } else if (isNonRetryable && rootCause instanceof ConnectException) {
+        ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+      } else {
+        throw e;
+      }
+    } catch (final Exception e) {
+      throw new SolrServerException(e);
+    }
+
+    return ex;
+  }
+
+  protected abstract SolrClient getClient(String baseUrl);
+
+  protected Exception addZombie(final String serverStr, final Exception e) {
+    final ServerWrapper wrapper = createServerWrapper(serverStr);
+    wrapper.standard = false;
+    zombieServers.put(serverStr, wrapper);
+    startAliveCheckExecutor();
+    return e;
+  }
+
+  /**
+   * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find 
if it is alive. Use this to set that interval
+   *
+   * @param interval time in milliseconds
+   */
+  public void setAliveCheckInterval(final int interval) {
+    if (interval <= 0) {
+      throw new IllegalArgumentException("Alive check interval must be " + 
"positive, specified value = " + interval);
+    }
+    this.interval = interval;
+  }
+
+  private void startAliveCheckExecutor() {
+    // double-checked locking, but it's OK because we don't *do* anything with 
aliveCheckExecutor
+    // if it's not null.
+    if (aliveCheckExecutor == null) {
+      synchronized (this) {
+        if (aliveCheckExecutor == null) {
+          aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(new 
SolrNamedThreadFactory("aliveCheckExecutor"));
+          aliveCheckExecutor.scheduleAtFixedRate(getAliveCheckRunner(new 
WeakReference<>(this)), this.interval, this.interval, TimeUnit.MILLISECONDS);
+        }
+      }
+    }
+  }
+
+  private static Runnable getAliveCheckRunner(final 
WeakReference<ModifiedLBSolrClient> lbRef) {
+    return () -> {
+      final ModifiedLBSolrClient lb = lbRef.get();
+      if (lb != null && lb.zombieServers != null) {
+        for (final Object zombieServer : lb.zombieServers.values()) {
+          lb.checkAZombieServer((ServerWrapper) zombieServer);
+        }
+      }
+    };
+  }
+
+  public ResponseParser getParser() {
+    return parser;
+  }
+
+  /**
+   * Changes the {@link ResponseParser} that will be used for the internal 
SolrServer objects.
+   *
+   * @param parser Default Response Parser chosen to parse the response if the 
parser were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(final ResponseParser parser) {
+    this.parser = parser;
+  }
+
+  /**
+   * Changes the {@link RequestWriter} that will be used for the internal 
SolrServer objects.
+   *
+   * @param requestWriter Default RequestWriter, used to encode requests sent 
to the server.
+   */
+  public void setRequestWriter(final RequestWriter requestWriter) {
+    this.requestWriter = requestWriter;
+  }
+
+  public RequestWriter getRequestWriter() {
+    return requestWriter;
+  }
+
+  private void checkAZombieServer(final ServerWrapper zombieServer) {
+    try {
+      final QueryRequest queryRequest = new QueryRequest(solrQuery);
+      queryRequest.setBasePath(zombieServer.baseUrl);
+      final QueryResponse resp = 
queryRequest.process(getClient(zombieServer.getBaseUrl()));
+      if (resp.getStatus() == 0) {
+        // server has come back up.
+        // make sure to remove from zombies before adding to alive to avoid a 
race condition
+        // where another thread could mark it down, move it back to zombie, 
and then we delete
+        // from zombie and lose it forever.
+        final ServerWrapper wrapper = 
zombieServers.remove(zombieServer.getBaseUrl());
+        if (wrapper != null) {
+          wrapper.failedPings = 0;
+          if (wrapper.standard) {
+            addToAlive(wrapper);
+          }
+        } else {
+          // something else already moved the server from zombie to alive
+        }
+      }
+    } catch (final Exception e) {
+      // Expected. The server is still down.
+      zombieServer.failedPings++;
+
+      // If the server doesn't belong in the standard set belonging to this 
load balancer
+      // then simply drop it after a certain number of failed pings.
+      if (!zombieServer.standard && zombieServer.failedPings >= 
NONSTANDARD_PING_LIMIT) {
+        zombieServers.remove(zombieServer.getBaseUrl());
+      }
+    }
+  }
+
+  private ServerWrapper removeFromAlive(final String key) {
+    synchronized (aliveServers) {
+      final ServerWrapper wrapper = aliveServers.remove(key);
+      if (wrapper != null)
+        updateAliveList();
+      return wrapper;
+    }
+  }
+
+  private void addToAlive(final ServerWrapper wrapper) {
+    synchronized (aliveServers) {
+      final ServerWrapper prev = aliveServers.put(wrapper.getBaseUrl(), 
wrapper);
+      // TODO: warn if there was a previous entry?
+      updateAliveList();
+    }
+  }
+
+  public void addSolrServer(final String server) throws MalformedURLException {
+    addToAlive(createServerWrapper(server));
+  }
+
+  public String removeSolrServer(String server) {
+    try {
+      server = new URL(server).toExternalForm();
+    } catch (final MalformedURLException e) {
+      throw new RuntimeException(e);
+    }
+    if (server.endsWith("/")) {
+      server = server.substring(0, server.length() - 1);
+    }
+
+    // there is a small race condition here - if the server is in the process 
of being moved between
+    // lists, we could fail to remove it.
+    removeFromAlive(server);
+    zombieServers.remove(server);
+    return null;
+  }
+
+  /**
+   * Tries to query a live server. A SolrServerException is thrown if all 
servers are dead. If the request failed due to IOException then the live server 
is moved to dead pool and the request is
+   * retried on another live server. After live servers are exhausted, any 
servers previously marked as dead will be tried before failing the request.
+   *
+   * @param request the SolrRequest.
+   * @return response
+   * @throws IOException If there is a low-level I/O error.
+   */
+  @Override
+  public NamedList<Object> request(final SolrRequest<?> request, final String 
collection) throws SolrServerException, IOException {
+    return request(request, collection, null);
+  }
+
+  public NamedList<Object> request(final SolrRequest<?> request, final String 
collection, final Integer numServersToTry) throws SolrServerException, 
IOException {
+    Exception ex = null;
+    final ServerWrapper[] serverList = aliveServerList;
+
+    final int maxTries = (numServersToTry == null ? serverList.length : 
numServersToTry.intValue());
+    int numServersTried = 0;
+    Map<String, ServerWrapper> justFailed = null;
+
+    boolean timeAllowedExceeded = false;
+    final long timeAllowedNano = getTimeAllowedInNanos(request);
+    final long timeOutTime = System.nanoTime() + timeAllowedNano;
+    for (int attempts = 0; attempts < maxTries; attempts++) {
+      if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        break;
+      }
+
+      final ServerWrapper wrapper = pickServer(serverList, request);
+      try {
+        ++numServersTried;
+        request.setBasePath(wrapper.baseUrl);
+        return getClient(wrapper.getBaseUrl()).request(request, collection);
+      } catch (final SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        throw e;
+      } catch (final SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          moveAliveToDead(wrapper);
+          if (justFailed == null)
+            justFailed = new HashMap<>();
+          justFailed.put(wrapper.getBaseUrl(), wrapper);
+        } else {
+          throw e;
+        }
+      } catch (final Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+    // try other standard servers that we didn't try just now
+    for (final ServerWrapper wrapper : zombieServers.values()) {
+      if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        break;
+      }
+
+      if (wrapper.standard == false || justFailed != null && 
justFailed.containsKey(wrapper.getBaseUrl()))
+        continue;
+      try {
+        ++numServersTried;
+        request.setBasePath(wrapper.baseUrl);
+        final NamedList<Object> rsp = 
getClient(wrapper.baseUrl).request(request, collection);
+        // remove from zombie list *before* adding to alive to avoid a race 
that could lose a server
+        zombieServers.remove(wrapper.getBaseUrl());
+        addToAlive(wrapper);
+        return rsp;
+      } catch (final SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        throw e;
+      } catch (final SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          // still dead
+        } else {
+          throw e;
+        }
+      } catch (final Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+    final String solrServerExceptionMessage;
+    if (timeAllowedExceeded) {
+      solrServerExceptionMessage = "Time allowed to handle this request 
exceeded";
+    } else {
+      if (numServersToTry != null && numServersTried > 
numServersToTry.intValue()) {
+        solrServerExceptionMessage = "No live SolrServers available to handle 
this request:" + " numServersTried=" + numServersTried + " numServersToTry=" + 
numServersToTry.intValue();
+      } else {
+        solrServerExceptionMessage = "No live SolrServers available to handle 
this request";
+      }
+    }
+    if (ex == null) {
+      throw new SolrServerException(solrServerExceptionMessage);
+    } else {
+      throw new SolrServerException(solrServerExceptionMessage, ex);
+    }
+  }
+
+  /**
+   * Pick a server from list to execute request. By default servers are picked 
in round-robin manner, custom classes can override this method for more advance 
logic
+   *
+   * @param aliveServerList list of currently alive servers
+   * @param request         the request will be sent to the picked server
+   * @return the picked server
+   */
+  protected ServerWrapper pickServer(final ServerWrapper[] aliveServerList, 
final SolrRequest<?> request) {
+    final int count = counter.incrementAndGet() & Integer.MAX_VALUE;
+    return aliveServerList[count % aliveServerList.length];
+  }
+
+  private void moveAliveToDead(ServerWrapper wrapper) {
+    wrapper = removeFromAlive(wrapper.getBaseUrl());
+    if (wrapper == null)
+      return; // another thread already detected the failure and removed it
+    zombieServers.put(wrapper.getBaseUrl(), wrapper);
+    startAliveCheckExecutor();
+  }
+
+  @Override
+  public void close() {
+    synchronized (this) {
+      if (aliveCheckExecutor != null) {
+        aliveCheckExecutor.shutdownNow();
+        ExecutorUtil.shutdownAndAwaitTermination(aliveCheckExecutor);
+      }
+    }
+  }
+
+}

Added: 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java
URL: 
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java?rev=1909097&view=auto
==============================================================================
--- 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java
 (added)
+++ 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java
 Wed Apr 12 14:35:38 2023
@@ -0,0 +1,137 @@
+package org.apache.manifoldcf.agents.output.solr;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.PreemptiveAuth;
+import 
org.apache.solr.client.solrj.impl.PreemptiveBasicAuthClientBuilderFactory;
+import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
+import org.apache.solr.client.solrj.util.SolrBasicAuthentication;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.StrUtils;
+import org.eclipse.jetty.client.HttpAuthenticationStore;
+import org.eclipse.jetty.client.ProxyAuthenticationProtocolHandler;
+import org.eclipse.jetty.client.WWWAuthenticationProtocolHandler;
+
+public class ModifiedPreemptiveBasicAuthClientBuilderFactory implements 
ModifiedHttpClientBuilderFactory {
+  /**
+   * A system property used to specify a properties file containing default 
parameters used for creating a HTTP client. This is specifically useful for 
configuring the HTTP basic auth credentials
+   * (i.e. username/password). The name of the property must match the 
relevant Solr config property name.
+   */
+  public static final String SYS_PROP_HTTP_CLIENT_CONFIG = 
"solr.httpclient.config";
+
+  /**
+   * A system property to configure the Basic auth credentials via a java 
system property. Since this will expose the password on the command-line, it is 
not very secure. But this mechanism is added
+   * for backwards compatibility.
+   */
+  public static final String SYS_PROP_BASIC_AUTH_CREDENTIALS = "basicauth";
+
+  private static PreemptiveAuth requestInterceptor = new PreemptiveAuth(new 
BasicScheme());
+
+  private static CredentialsResolver CREDENTIAL_RESOLVER = new 
CredentialsResolver();
+
+  /**
+   * This method enables configuring system wide defaults (apart from using a 
config file based approach).
+   */
+  public static void setDefaultSolrParams(final SolrParams params) {
+    CREDENTIAL_RESOLVER.defaultParams = params;
+  }
+
+  @Override
+  public void close() throws IOException {
+    HttpClientUtil.removeRequestInterceptor(requestInterceptor);
+  }
+
+  @Override
+  public void setup(final ModifiedHttp2SolrClient client) {
+    final String basicAuthUser = 
CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
+    final String basicAuthPass = 
CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
+    this.setup(client, basicAuthUser, basicAuthPass);
+  }
+
+  public void setup(final ModifiedHttp2SolrClient client, final String 
basicAuthUser, final String basicAuthPass) {
+    if (basicAuthUser == null || basicAuthPass == null) {
+      throw new IllegalArgumentException("username & password must be 
specified with " + getClass().getName());
+    }
+
+    final HttpAuthenticationStore authenticationStore = new 
HttpAuthenticationStore();
+    authenticationStore.addAuthentication(new 
SolrBasicAuthentication(basicAuthUser, basicAuthPass));
+    client.getHttpClient().setAuthenticationStore(authenticationStore);
+    client.getProtocolHandlers().put(new 
WWWAuthenticationProtocolHandler(client.getHttpClient()));
+    client.getProtocolHandlers().put(new 
ProxyAuthenticationProtocolHandler(client.getHttpClient()));
+  }
+
+  @Override
+  public SolrHttpClientBuilder getHttpClientBuilder(final 
SolrHttpClientBuilder builder) {
+    final String basicAuthUser = 
CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
+    final String basicAuthPass = 
CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
+    if (basicAuthUser == null || basicAuthPass == null) {
+      throw new IllegalArgumentException("username & password must be 
specified with " + getClass().getName());
+    }
+
+    return initHttpClientBuilder(builder == null ? 
SolrHttpClientBuilder.create() : builder, basicAuthUser, basicAuthPass);
+  }
+
+  private SolrHttpClientBuilder initHttpClientBuilder(final 
SolrHttpClientBuilder builder, final String basicAuthUser, final String 
basicAuthPass) {
+    builder.setDefaultCredentialsProvider(() -> {
+      final CredentialsProvider credsProvider = new BasicCredentialsProvider();
+      credsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(basicAuthUser, basicAuthPass));
+      return credsProvider;
+    });
+
+    HttpClientUtil.addRequestInterceptor(requestInterceptor);
+    return builder;
+  }
+
+  static class CredentialsResolver {
+
+    public volatile SolrParams defaultParams;
+
+    public CredentialsResolver() {
+      final String credentials = 
System.getProperty(PreemptiveBasicAuthClientBuilderFactory.SYS_PROP_BASIC_AUTH_CREDENTIALS);
+      final String configFile = 
System.getProperty(PreemptiveBasicAuthClientBuilderFactory.SYS_PROP_HTTP_CLIENT_CONFIG);
+
+      if (credentials != null && configFile != null) {
+        throw new IllegalArgumentException("Basic authentication credentials 
passed via a configuration file" + " as well as java system property. Please 
choose one mechanism!");
+      }
+
+      if (credentials != null) {
+        final List<String> ss = StrUtils.splitSmart(credentials, ':');
+        if (ss.size() != 2 || StringUtils.isEmpty(ss.get(0)) || 
StringUtils.isEmpty(ss.get(1))) {
+          throw new IllegalArgumentException("Invalid Authentication 
credentials: Please provide 'basicauth' in the 'user:password' format");
+        }
+        final Map<String, String> paramMap = new HashMap<>();
+        paramMap.put(HttpClientUtil.PROP_BASIC_AUTH_USER, ss.get(0));
+        paramMap.put(HttpClientUtil.PROP_BASIC_AUTH_PASS, ss.get(1));
+        defaultParams = new MapSolrParams(paramMap);
+      } else if (configFile != null) {
+        final Properties defaultProps = new Properties();
+        try (BufferedReader reader = 
Files.newBufferedReader(Paths.get(configFile), StandardCharsets.UTF_8)) {
+          defaultProps.load(reader);
+        } catch (final IOException e) {
+          throw new IllegalArgumentException("Unable to read credentials file 
at " + configFile, e);
+        }
+        final Map<String, String> map = new HashMap<>();
+        defaultProps.forEach((k, v) -> map.put((String) k, (String) v));
+        defaultParams = new MapSolrParams(map);
+      } else {
+        defaultParams = null;
+      }
+    }
+  }
+}

Added: 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java
URL: 
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java?rev=1909097&view=auto
==============================================================================
--- 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java
 (added)
+++ 
manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java
 Wed Apr 12 14:35:38 2023
@@ -0,0 +1,520 @@
+package org.apache.manifoldcf.agents.output.solr;
+
+import static org.apache.solr.common.params.ShardParams._ROUTE_;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.XML;
+
+public class ModifiedUpdateRequest extends AbstractUpdateRequest {
+
+  public static final String REPFACT = "rf";
+  public static final String VER = "ver";
+  public static final String OVERWRITE = "ow";
+  public static final String COMMIT_WITHIN = "cw";
+  private Map<SolrInputDocument, Map<String, Object>> documents = null;
+  private Iterator<SolrInputDocument> docIterator = null;
+  private Map<String, Map<String, Object>> deleteById = null;
+  private List<String> deleteQuery = null;
+
+  private boolean isLastDocInBatch = false;
+
+  public ModifiedUpdateRequest() {
+    super(METHOD.POST, "/update");
+  }
+
+  public ModifiedUpdateRequest(final String url) {
+    super(METHOD.POST, url);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // 
---------------------------------------------------------------------------
+
+  /** clear the pending documents and delete commands */
+  public void clear() {
+    if (documents != null) {
+      documents.clear();
+    }
+    if (deleteById != null) {
+      deleteById.clear();
+    }
+    if (deleteQuery != null) {
+      deleteQuery.clear();
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Add a SolrInputDocument to this request
+   *
+   * @throws NullPointerException if the document is null
+   */
+  public ModifiedUpdateRequest add(final SolrInputDocument doc) {
+    Objects.requireNonNull(doc, "Cannot add a null SolrInputDocument");
+    if (documents == null) {
+      documents = new LinkedHashMap<>();
+    }
+    documents.put(doc, null);
+    return this;
+  }
+
+  public ModifiedUpdateRequest add(final String... fields) {
+    return add(new SolrInputDocument(fields));
+  }
+
+  /**
+   * Add a SolrInputDocument to this request
+   *
+   * @param doc       the document
+   * @param overwrite true if the document should overwrite existing docs with 
the same id
+   * @throws NullPointerException if the document is null
+   */
+  public ModifiedUpdateRequest add(final SolrInputDocument doc, final Boolean 
overwrite) {
+    return add(doc, null, overwrite);
+  }
+
+  /**
+   * Add a SolrInputDocument to this request
+   *
+   * @param doc          the document
+   * @param commitWithin the time horizon by which the document should be 
committed (in ms)
+   * @throws NullPointerException if the document is null
+   */
+  public ModifiedUpdateRequest add(final SolrInputDocument doc, final Integer 
commitWithin) {
+    return add(doc, commitWithin, null);
+  }
+
+  /**
+   * Add a SolrInputDocument to this request
+   *
+   * @param doc          the document
+   * @param commitWithin the time horizon by which the document should be 
committed (in ms)
+   * @param overwrite    true if the document should overwrite existing docs 
with the same id
+   * @throws NullPointerException if the document is null
+   */
+  public ModifiedUpdateRequest add(final SolrInputDocument doc, final Integer 
commitWithin, final Boolean overwrite) {
+    Objects.requireNonNull(doc, "Cannot add a null SolrInputDocument");
+    if (documents == null) {
+      documents = new LinkedHashMap<>();
+    }
+    final Map<String, Object> params = new HashMap<>(2);
+    if (commitWithin != null)
+      params.put(COMMIT_WITHIN, commitWithin);
+    if (overwrite != null)
+      params.put(OVERWRITE, overwrite);
+
+    documents.put(doc, params);
+
+    return this;
+  }
+
+  /**
+   * Add a collection of SolrInputDocuments to this request
+   *
+   * @throws NullPointerException if any of the documents in the collection 
are null
+   */
+  public ModifiedUpdateRequest add(final Collection<SolrInputDocument> docs) {
+    if (documents == null) {
+      documents = new LinkedHashMap<>();
+    }
+    for (final SolrInputDocument doc : docs) {
+      Objects.requireNonNull(doc, "Cannot add a null SolrInputDocument");
+      documents.put(doc, null);
+    }
+    return this;
+  }
+
+  public ModifiedUpdateRequest deleteById(final String id) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<>();
+    }
+    deleteById.put(id, null);
+    return this;
+  }
+
+  public ModifiedUpdateRequest deleteById(final String id, final String route) 
{
+    return deleteById(id, route, null);
+  }
+
+  public ModifiedUpdateRequest deleteById(final String id, final String route, 
final Long version) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<>();
+    }
+    final Map<String, Object> params = (route == null && version == null) ? 
null : new HashMap<>(1);
+    if (version != null)
+      params.put(VER, version);
+    if (route != null)
+      params.put(_ROUTE_, route);
+    deleteById.put(id, params);
+    return this;
+  }
+
+  public ModifiedUpdateRequest deleteById(final List<String> ids) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<>();
+    }
+
+    for (final String id : ids) {
+      deleteById.put(id, null);
+    }
+
+    return this;
+  }
+
+  public ModifiedUpdateRequest deleteById(final String id, final Long version) 
{
+    return deleteById(id, null, version);
+  }
+
+  public ModifiedUpdateRequest deleteByQuery(final String q) {
+    if (deleteQuery == null) {
+      deleteQuery = new ArrayList<>();
+    }
+    deleteQuery.add(q);
+    return this;
+  }
+
+  public ModifiedUpdateRequest withRoute(final String route) {
+    if (params == null)
+      params = new ModifiableSolrParams();
+    params.set(_ROUTE_, route);
+    return this;
+  }
+
+  public UpdateResponse commit(final SolrClient client, final String 
collection) throws IOException, SolrServerException {
+    if (params == null)
+      params = new ModifiableSolrParams();
+    params.set(UpdateParams.COMMIT, "true");
+    return process(client, collection);
+  }
+
+  private interface ReqSupplier<T extends ModifiedLBSolrClient.Req> {
+    T get(ModifiedUpdateRequest request, List<String> servers);
+  }
+
+  private <T extends ModifiedLBSolrClient.Req> Map<String, T> getRoutes(final 
DocRouter router, final DocCollection col, final Map<String, List<String>> 
urlMap, final ModifiableSolrParams params,
+      final String idField, final ReqSupplier<T> reqSupplier) {
+    if ((documents == null || documents.size() == 0) && (deleteById == null || 
deleteById.size() == 0)) {
+      return null;
+    }
+
+    final Map<String, T> routes = new HashMap<>();
+    if (documents != null) {
+      final Set<Entry<SolrInputDocument, Map<String, Object>>> entries = 
documents.entrySet();
+      for (final Entry<SolrInputDocument, Map<String, Object>> entry : 
entries) {
+        final SolrInputDocument doc = entry.getKey();
+        final Object id = doc.getFieldValue(idField);
+        if (id == null) {
+          return null;
+        }
+        final Slice slice = router.getTargetSlice(id.toString(), doc, null, 
null, col);
+        if (slice == null) {
+          return null;
+        }
+        final List<String> urls = urlMap.get(slice.getName());
+        if (urls == null) {
+          return null;
+        }
+        final String leaderUrl = urls.get(0);
+        T request = routes.get(leaderUrl);
+        if (request == null) {
+          final ModifiedUpdateRequest updateRequest = new 
ModifiedUpdateRequest();
+          updateRequest.setMethod(getMethod());
+          updateRequest.setCommitWithin(getCommitWithin());
+          updateRequest.setParams(params);
+          updateRequest.setPath(getPath());
+          updateRequest.setBasicAuthCredentials(getBasicAuthUser(), 
getBasicAuthPassword());
+          updateRequest.setResponseParser(getResponseParser());
+          request = reqSupplier.get(updateRequest, urls);
+          routes.put(leaderUrl, request);
+        }
+        final ModifiedUpdateRequest urequest = (ModifiedUpdateRequest) 
request.getRequest();
+        final Map<String, Object> value = entry.getValue();
+        Boolean ow = null;
+        if (value != null) {
+          ow = (Boolean) value.get(OVERWRITE);
+        }
+        if (ow != null) {
+          urequest.add(doc, ow);
+        } else {
+          urequest.add(doc);
+        }
+      }
+    }
+
+    // Route the deleteById's
+
+    if (deleteById != null) {
+
+      final Iterator<Map.Entry<String, Map<String, Object>>> entries = 
deleteById.entrySet().iterator();
+      while (entries.hasNext()) {
+
+        final Map.Entry<String, Map<String, Object>> entry = entries.next();
+
+        final String deleteId = entry.getKey();
+        final Map<String, Object> map = entry.getValue();
+        Long version = null;
+        String route = null;
+        if (map != null) {
+          version = (Long) map.get(VER);
+          route = (String) map.get(_ROUTE_);
+        }
+        final Slice slice = router.getTargetSlice(deleteId, null, route, null, 
col);
+        if (slice == null) {
+          return null;
+        }
+        final List<String> urls = urlMap.get(slice.getName());
+        if (urls == null) {
+          return null;
+        }
+        final String leaderUrl = urls.get(0);
+        T request = routes.get(leaderUrl);
+        if (request != null) {
+          final ModifiedUpdateRequest urequest = (ModifiedUpdateRequest) 
request.getRequest();
+          urequest.deleteById(deleteId, route, version);
+        } else {
+          final ModifiedUpdateRequest urequest = new ModifiedUpdateRequest();
+          urequest.setParams(params);
+          urequest.deleteById(deleteId, route, version);
+          urequest.setCommitWithin(getCommitWithin());
+          urequest.setBasicAuthCredentials(getBasicAuthUser(), 
getBasicAuthPassword());
+          request = reqSupplier.get(urequest, urls);
+          routes.put(leaderUrl, request);
+        }
+      }
+    }
+
+    return routes;
+  }
+
+  /**
+   * @param router  to route updates with
+   * @param col     DocCollection for the updates
+   * @param urlMap  of the cluster
+   * @param params  params to use
+   * @param idField the id field
+   * @return a Map of urls to requests
+   */
+  public Map<String, ModifiedLBSolrClient.Req> getRoutesToCollection(final 
DocRouter router, final DocCollection col, final Map<String, List<String>> 
urlMap, final ModifiableSolrParams params,
+      final String idField) {
+    return getRoutes(router, col, urlMap, params, idField, 
ModifiedLBSolrClient.Req::new);
+  }
+
+  public void setDocIterator(final Iterator<SolrInputDocument> docIterator) {
+    this.docIterator = docIterator;
+  }
+
+  public void setDeleteQuery(final List<String> deleteQuery) {
+    this.deleteQuery = deleteQuery;
+  }
+
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+
+  @Override
+  public Collection<ContentStream> getContentStreams() throws IOException {
+    return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
+  }
+
+  public String getXML() throws IOException {
+    final StringWriter writer = new StringWriter();
+    writeXML(writer);
+    writer.flush();
+
+    // If action is COMMIT or OPTIMIZE, it is sent with params
+    final String xml = writer.toString();
+    // System.out.println( "SEND:"+xml );
+    return (xml.length() > 0) ? xml : null;
+  }
+
+  private List<Map<SolrInputDocument, Map<String, Object>>> getDocLists(final 
Map<SolrInputDocument, Map<String, Object>> documents) {
+    final List<Map<SolrInputDocument, Map<String, Object>>> docLists = new 
ArrayList<>();
+    Map<SolrInputDocument, Map<String, Object>> docList = null;
+    if (this.documents != null) {
+
+      Boolean lastOverwrite = true;
+      Integer lastCommitWithin = -1;
+
+      final Set<Entry<SolrInputDocument, Map<String, Object>>> entries = 
this.documents.entrySet();
+      for (final Entry<SolrInputDocument, Map<String, Object>> entry : 
entries) {
+        final Map<String, Object> map = entry.getValue();
+        Boolean overwrite = null;
+        Integer commitWithin = null;
+        if (map != null) {
+          overwrite = (Boolean) entry.getValue().get(OVERWRITE);
+          commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN);
+        }
+        if (!Objects.equals(overwrite, lastOverwrite) || 
!Objects.equals(commitWithin, lastCommitWithin) || docLists.isEmpty()) {
+          docList = new LinkedHashMap<>();
+          docLists.add(docList);
+        }
+        docList.put(entry.getKey(), entry.getValue());
+        lastCommitWithin = commitWithin;
+        lastOverwrite = overwrite;
+      }
+    }
+
+    if (docIterator != null) {
+      docList = new LinkedHashMap<>();
+      docLists.add(docList);
+      while (docIterator.hasNext()) {
+        final SolrInputDocument doc = docIterator.next();
+        if (doc != null) {
+          docList.put(doc, null);
+        }
+      }
+    }
+
+    return docLists;
+  }
+
+  /**
+   * @since solr 1.4
+   */
+  public ModifiedUpdateRequest writeXML(final Writer writer) throws 
IOException {
+    final List<Map<SolrInputDocument, Map<String, Object>>> getDocLists = 
getDocLists(documents);
+
+    for (final Map<SolrInputDocument, Map<String, Object>> docs : getDocLists) 
{
+
+      if ((docs != null && docs.size() > 0)) {
+        final Entry<SolrInputDocument, Map<String, Object>> firstDoc = 
docs.entrySet().iterator().next();
+        final Map<String, Object> map = firstDoc.getValue();
+        Integer cw = null;
+        Boolean ow = null;
+        if (map != null) {
+          cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN);
+          ow = (Boolean) firstDoc.getValue().get(OVERWRITE);
+        }
+        if (ow == null)
+          ow = true;
+        final int commitWithin = (cw != null && cw != -1) ? cw : 
this.commitWithin;
+        final boolean overwrite = ow;
+        if (commitWithin > -1 || overwrite != true) {
+          writer.write("<add commitWithin=\"" + commitWithin + "\" " + 
"overwrite=\"" + overwrite + "\">");
+        } else {
+          writer.write("<add>");
+        }
+
+        final Set<Entry<SolrInputDocument, Map<String, Object>>> entries = 
docs.entrySet();
+        for (final Entry<SolrInputDocument, Map<String, Object>> entry : 
entries) {
+          ClientUtils.writeXML(entry.getKey(), writer);
+        }
+
+        writer.write("</add>");
+      }
+    }
+
+    // Add the delete commands
+    final boolean deleteI = deleteById != null && deleteById.size() > 0;
+    final boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
+    if (deleteI || deleteQ) {
+      if (commitWithin > 0) {
+        writer.append("<delete 
commitWithin=\"").append(String.valueOf(commitWithin)).append("\">");
+      } else {
+        writer.append("<delete>");
+      }
+      if (deleteI) {
+        for (final Map.Entry<String, Map<String, Object>> entry : 
deleteById.entrySet()) {
+          writer.append("<id");
+          final Map<String, Object> map = entry.getValue();
+          if (map != null) {
+            final Long version = (Long) map.get(VER);
+            final String route = (String) map.get(_ROUTE_);
+            if (version != null) {
+              writer.append(" 
version=\"").append(String.valueOf(version)).append('"');
+            }
+
+            if (route != null) {
+              writer.append(" _route_=\"").append(route).append('"');
+            }
+          }
+          writer.append(">");
+
+          XML.escapeCharData(entry.getKey(), writer);
+          writer.append("</id>");
+        }
+      }
+      if (deleteQ) {
+        for (final String q : deleteQuery) {
+          writer.append("<query>");
+          XML.escapeCharData(q, writer);
+          writer.append("</query>");
+        }
+      }
+      writer.append("</delete>");
+    }
+    return this;
+  }
+
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+
+  // --------------------------------------------------------------------------
+  //
+  // --------------------------------------------------------------------------
+
+  public List<SolrInputDocument> getDocuments() {
+    if (documents == null)
+      return null;
+    final List<SolrInputDocument> docs = new ArrayList<>(documents.size());
+    docs.addAll(documents.keySet());
+    return docs;
+  }
+
+  public Map<SolrInputDocument, Map<String, Object>> getDocumentsMap() {
+    return documents;
+  }
+
+  public Iterator<SolrInputDocument> getDocIterator() {
+    return docIterator;
+  }
+
+  public List<String> getDeleteById() {
+    if (deleteById == null)
+      return null;
+    final List<String> deletes = new ArrayList<>(deleteById.keySet());
+    return deletes;
+  }
+
+  public Map<String, Map<String, Object>> getDeleteByIdMap() {
+    return deleteById;
+  }
+
+  public List<String> getDeleteQuery() {
+    return deleteQuery;
+  }
+
+  public boolean isLastDocInBatch() {
+    return isLastDocInBatch;
+  }
+
+  public void lastDocInBatch() {
+    isLastDocInBatch = true;
+  }
+
+}

Modified: manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml
URL: 
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml?rev=1909097&r1=1909096&r2=1909097&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml (original)
+++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml Wed Apr 12 
14:35:38 2023
@@ -244,6 +244,24 @@
       <version>${slf4j.version}</version>
     </dependency>
     
+    <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-client</artifactId>
+        <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.eclipse.jetty.http2</groupId>
+        <artifactId>http2-http-client-transport</artifactId>
+        <version>${jetty.version}</version>
+    </dependency>
+
+
+
+
+
+
+
+    
     <!-- Testing dependencies -->
     
     <dependency>
@@ -325,7 +343,6 @@
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util</artifactId>
       <version>${jetty.version}</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
@@ -343,7 +360,6 @@
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-http</artifactId>
       <version>${jetty.version}</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>

Modified: 
manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java
URL: 
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java?rev=1909097&r1=1909096&r2=1909097&view=diff
==============================================================================
--- 
manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java
 (original)
+++ 
manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java
 Wed Apr 12 14:35:38 2023
@@ -18,31 +18,31 @@
 */
 package org.apache.manifoldcf.core.lockmanager;
 
-import java.util.*;
-import java.io.*;
-import org.apache.zookeeper.server.*;
-import org.apache.zookeeper.server.quorum.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
-public class ZooKeeperInstance
-{
+public class ZooKeeperInstance {
   protected final int zkPort;
   protected final File tempDir;
-  
+
   protected ZooKeeperThread zookeeperThread = null;
-  
-  public ZooKeeperInstance(int zkPort, File tempDir)
-  {
+
+  public ZooKeeperInstance(final int zkPort, final File tempDir) {
     this.zkPort = zkPort;
     this.tempDir = tempDir;
   }
 
-  public void start()
-    throws Exception
-  {
-    Properties startupProperties = new Properties();
-    startupProperties.setProperty("tickTime","2000");
-    startupProperties.setProperty("dataDir",tempDir.toString());
-    startupProperties.setProperty("clientPort",Integer.toString(zkPort));
+  public void start() throws Exception {
+    final Properties startupProperties = new Properties();
+    startupProperties.setProperty("tickTime", "2000");
+    startupProperties.setProperty("dataDir", tempDir.toString());
+    startupProperties.setProperty("clientPort", Integer.toString(zkPort));
+    startupProperties.setProperty("admin.enableServer", "false");
 
     final QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
     quorumConfiguration.parseProperties(startupProperties);
@@ -53,33 +53,26 @@ public class ZooKeeperInstance
     zookeeperThread = new ZooKeeperThread(configuration);
     zookeeperThread.start();
     // We have no way of knowing whether zookeeper is alive or not, but the
-    // client is supposed to know about that.  But it doesn't, so wait for 5 
seconds
+    // client is supposed to know about that. But it doesn't, so wait for 5 
seconds
     Thread.sleep(5000L);
   }
-  
-  public void stop()
-    throws Exception
-  {
-    while (true)
-    {
+
+  public void stop() throws Exception {
+    while (true) {
       if (zookeeperThread == null)
         break;
-      else if (!zookeeperThread.isAlive())
-      {
-        Throwable e = zookeeperThread.finishUp();
-        if (e != null)
-        {
+      else if (!zookeeperThread.isAlive()) {
+        final Throwable e = zookeeperThread.finishUp();
+        if (e != null) {
           if (e instanceof RuntimeException)
-            throw (RuntimeException)e;
+            throw (RuntimeException) e;
           else if (e instanceof Exception)
-            throw (Exception)e;
+            throw (Exception) e;
           else if (e instanceof Error)
-            throw (Error)e;
+            throw (Error) e;
         }
         zookeeperThread = null;
-      }
-      else
-      {
+      } else {
         // This isn't the best way to kill zookeeper but it's the only way
         // we've got.
         zookeeperThread.interrupt();
@@ -87,42 +80,33 @@ public class ZooKeeperInstance
       }
     }
   }
-  
-  protected static class ZooKeeperThread extends Thread
-  {
+
+  protected static class ZooKeeperThread extends Thread {
     protected final ServerConfig config;
-    
+
     protected Throwable exception = null;
-    
-    public ZooKeeperThread(ServerConfig config)
-    {
+
+    public ZooKeeperThread(final ServerConfig config) {
       this.config = config;
     }
-    
-    public void run()
-    {
-      try
-      {
-        ZooKeeperServerMain server = new ZooKeeperServerMain();
+
+    @Override
+    public void run() {
+      try {
+        final ZooKeeperServerMain server = new ZooKeeperServerMain();
         server.runFromConfig(config);
-      }
-      catch (IOException e)
-      {
+      } catch (final IOException e) {
         // Ignore IOExceptions, since that seems to be normal when shutting
         // down zookeeper via thread.interrupt()
-      }
-      catch (Throwable e)
-      {
+      } catch (final Throwable e) {
         exception = e;
       }
     }
-    
-    public Throwable finishUp()
-      throws InterruptedException
-    {
+
+    public Throwable finishUp() throws InterruptedException {
       join();
       return exception;
     }
   }
-  
+
 }


Reply via email to