Added: manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java?rev=1909097&view=auto ============================================================================== --- manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java (added) +++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBSolrClient.java Wed Apr 12 14:35:38 2023 @@ -0,0 +1,691 @@ +package org.apache.manifoldcf.agents.output.solr; + +import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.ConnectException; +import java.net.MalformedURLException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.solr.client.solrj.ResponseParser; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.BaseHttpSolrClient; +import org.apache.solr.client.solrj.request.IsUpdateRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.slf4j.MDC; + +public abstract class ModifiedLBSolrClient extends SolrClient { + + // defaults + protected static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500)); + private static final int CHECK_INTERVAL = 60 * 1000; // 1 minute between checks + private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list + + // keys to the maps are currently of the form "http://localhost:8983/solr" + // which should be equivalent to HttpSolrServer.getBaseURL() + private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>(); + // access to aliveServers should be synchronized on itself + + protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>(); + + // changes to aliveServers are reflected in this array, no need to synchronize + private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0]; + + private volatile ScheduledExecutorService aliveCheckExecutor; + + private int interval = CHECK_INTERVAL; + private final AtomicInteger counter = new AtomicInteger(-1); + + private static final SolrQuery solrQuery = new SolrQuery("*:*"); + protected volatile ResponseParser parser; + protected volatile RequestWriter requestWriter; + + protected Set<String> queryParams = new HashSet<>(); + + static { + solrQuery.setRows(0); + /** + * Default sort (if we don't supply a sort) is by score and since we request 0 rows any sorting and scoring is not necessary. SolrQuery.DOCID schema-independently specifies a non-scoring sort. + * <code>_docid_ asc</code> sort is efficient, <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort. + */ + solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc); + // not a top-level request, we are interested only in the server being sent to i.e. it need not + // distribute our request to further servers + solrQuery.setDistrib(false); + } + + protected static class ServerWrapper { + final String baseUrl; + + // "standard" servers are used by default. They normally live in the alive list + // and move to the zombie list when unavailable. When they become available again, + // they move back to the alive list. + boolean standard = true; + + int failedPings = 0; + + ServerWrapper(final String baseUrl) { + this.baseUrl = baseUrl; + } + + public String getBaseUrl() { + return baseUrl; + } + + @Override + public String toString() { + return baseUrl; + } + + @Override + public int hashCode() { + return baseUrl.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (!(obj instanceof ServerWrapper)) + return false; + return baseUrl.equals(((ServerWrapper) obj).baseUrl); + } + } + + protected static class ServerIterator { + String serverStr; + List<String> skipped; + int numServersTried; + Iterator<String> it; + Iterator<String> skippedIt; + String exceptionMessage; + long timeAllowedNano; + long timeOutTime; + + final Map<String, ServerWrapper> zombieServers; + final Req req; + + public ServerIterator(final Req req, final Map<String, ServerWrapper> zombieServers) { + this.it = req.getServers().iterator(); + this.req = req; + this.zombieServers = zombieServers; + this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest()); + this.timeOutTime = System.nanoTime() + timeAllowedNano; + fetchNext(); + } + + public synchronized boolean hasNext() { + return serverStr != null; + } + + private void fetchNext() { + serverStr = null; + if (req.numServersToTry != null && numServersTried > req.numServersToTry) { + exceptionMessage = "Time allowed to handle this request exceeded"; + return; + } + + while (it.hasNext()) { + serverStr = it.next(); + serverStr = normalize(serverStr); + // if the server is currently a zombie, just skip to the next one + final ServerWrapper wrapper = zombieServers.get(serverStr); + if (wrapper != null) { + final int numDeadServersToTry = req.getNumDeadServersToTry(); + if (numDeadServersToTry > 0) { + if (skipped == null) { + skipped = new ArrayList<>(numDeadServersToTry); + skipped.add(wrapper.getBaseUrl()); + } else if (skipped.size() < numDeadServersToTry) { + skipped.add(wrapper.getBaseUrl()); + } + } + continue; + } + + break; + } + if (serverStr == null && skipped != null) { + if (skippedIt == null) { + skippedIt = skipped.iterator(); + } + if (skippedIt.hasNext()) { + serverStr = skippedIt.next(); + } + } + } + + boolean isServingZombieServer() { + return skippedIt != null; + } + + public synchronized String nextOrError() throws SolrServerException { + return nextOrError(null); + } + + public synchronized String nextOrError(final Exception previousEx) throws SolrServerException { + String suffix = ""; + if (previousEx == null) { + suffix = ":" + zombieServers.keySet(); + } + // Skipping check time exceeded for the first request + if (numServersTried > 0 && isTimeExceeded(timeAllowedNano, timeOutTime)) { + throw new SolrServerException("Time allowed to handle this request exceeded" + suffix, previousEx); + } + if (serverStr == null) { + throw new SolrServerException("No live SolrServers available to handle this request" + suffix, previousEx); + } + numServersTried++; + if (req.getNumServersToTry() != null && numServersTried > req.getNumServersToTry()) { + throw new SolrServerException("No live SolrServers available to handle this request:" + " numServersTried=" + numServersTried + " numServersToTry=" + req.getNumServersToTry() + suffix, + previousEx); + } + final String rs = serverStr; + fetchNext(); + return rs; + } + } + + // Req should be parameterized too, but that touches a whole lotta code + public static class Req { + protected SolrRequest<?> request; + protected List<String> servers; + protected int numDeadServersToTry; + private final Integer numServersToTry; + + public Req(final SolrRequest<?> request, final List<String> servers) { + this(request, servers, null); + } + + public Req(final SolrRequest<?> request, final List<String> servers, final Integer numServersToTry) { + this.request = request; + this.servers = servers; + this.numDeadServersToTry = servers.size(); + this.numServersToTry = numServersToTry; + } + + public SolrRequest<?> getRequest() { + return request; + } + + public List<String> getServers() { + return servers; + } + + /** + * @return the number of dead servers to try if there are no live servers left + */ + public int getNumDeadServersToTry() { + return numDeadServersToTry; + } + + /** + * @param numDeadServersToTry The number of dead servers to try if there are no live servers left. Defaults to the number of servers in this request. + */ + public void setNumDeadServersToTry(final int numDeadServersToTry) { + this.numDeadServersToTry = numDeadServersToTry; + } + + public Integer getNumServersToTry() { + return numServersToTry; + } + } + + public static class Rsp { + protected String server; + protected NamedList<Object> rsp; + + /** The response from the server */ + public NamedList<Object> getResponse() { + return rsp; + } + + /** The server that returned the response */ + public String getServer() { + return server; + } + } + + public ModifiedLBSolrClient(final List<String> baseSolrUrls) { + if (!baseSolrUrls.isEmpty()) { + for (final String s : baseSolrUrls) { + final ServerWrapper wrapper = createServerWrapper(s); + aliveServers.put(wrapper.getBaseUrl(), wrapper); + } + updateAliveList(); + } + } + + protected void updateAliveList() { + synchronized (aliveServers) { + aliveServerList = aliveServers.values().toArray(new ServerWrapper[0]); + } + } + + protected ServerWrapper createServerWrapper(final String baseUrl) { + return new ServerWrapper(baseUrl); + } + + public Set<String> getQueryParams() { + return queryParams; + } + + /** + * Expert Method. + * + * @param queryParams set of param keys to only send via the query string + */ + public void setQueryParams(final Set<String> queryParams) { + this.queryParams = queryParams; + } + + public void addQueryParams(final String queryOnlyParam) { + this.queryParams.add(queryOnlyParam); + } + + public static String normalize(String server) { + if (server.endsWith("/")) + server = server.substring(0, server.length() - 1); + return server; + } + + /** + * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped. If a request fails due to an IOException, the server is moved to the dead pool for a certain + * period of time, or until a test request on that server succeeds. + * + * <p> + * Servers are queried in the exact order given (except servers currently in the dead pool are skipped). If no live servers from the provided list remain to be tried, a number of previously skipped + * dead servers will be tried. Req.getNumDeadServersToTry() controls how many dead servers will be tried. + * + * <p> + * If no live servers are found a SolrServerException is thrown. + * + * @param req contains both the request as well as the list of servers to query + * @return the result of the request + * @throws IOException If there is a low-level I/O error. + */ + public Rsp request(final Req req) throws SolrServerException, IOException { + final Rsp rsp = new Rsp(); + Exception ex = null; + final boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath()); + final ServerIterator serverIterator = new ServerIterator(req, zombieServers); + String serverStr; + while ((serverStr = serverIterator.nextOrError(ex)) != null) { + try { + MDC.put("ModifiedLBSolrClient.url", serverStr); + ex = doRequest(serverStr, req, rsp, isNonRetryable, serverIterator.isServingZombieServer()); + if (ex == null) { + return rsp; // SUCCESS + } + } finally { + MDC.remove("ModifiedLBSolrClient.url"); + } + } + throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex); + } + + /** + * @return time allowed in nanos, returns -1 if no time_allowed is specified. + */ + private static long getTimeAllowedInNanos(final SolrRequest<?> req) { + final SolrParams reqParams = req.getParams(); + return reqParams == null ? -1 : TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS); + } + + private static boolean isTimeExceeded(final long timeAllowedNano, final long timeOutTime) { + return timeAllowedNano > 0 && System.nanoTime() > timeOutTime; + } + + protected Exception doRequest(final String baseUrl, final Req req, final Rsp rsp, final boolean isNonRetryable, final boolean isZombie) throws SolrServerException, IOException { + Exception ex = null; + try { + rsp.server = baseUrl; + req.getRequest().setBasePath(baseUrl); + rsp.rsp = getClient(baseUrl).request(req.getRequest(), (String) null); + if (isZombie) { + zombieServers.remove(baseUrl); + } + } catch (final BaseHttpSolrClient.RemoteExecutionException e) { + throw e; + } catch (final SolrException e) { + // we retry on 404 or 403 or 503 or 500 + // unless it's an update - then we only retry on connect exception + if (!isNonRetryable && RETRY_CODES.contains(e.code())) { + ex = (!isZombie) ? addZombie(baseUrl, e) : e; + } else { + // Server is alive but the request was likely malformed or invalid + if (isZombie) { + zombieServers.remove(baseUrl); + } + throw e; + } + } catch (final SocketException e) { + if (!isNonRetryable || e instanceof ConnectException) { + ex = (!isZombie) ? addZombie(baseUrl, e) : e; + } else { + throw e; + } + } catch (final SocketTimeoutException e) { + if (!isNonRetryable) { + ex = (!isZombie) ? addZombie(baseUrl, e) : e; + } else { + throw e; + } + } catch (final SolrServerException e) { + final Throwable rootCause = e.getRootCause(); + if (!isNonRetryable && rootCause instanceof IOException) { + ex = (!isZombie) ? addZombie(baseUrl, e) : e; + } else if (isNonRetryable && rootCause instanceof ConnectException) { + ex = (!isZombie) ? addZombie(baseUrl, e) : e; + } else { + throw e; + } + } catch (final Exception e) { + throw new SolrServerException(e); + } + + return ex; + } + + protected abstract SolrClient getClient(String baseUrl); + + protected Exception addZombie(final String serverStr, final Exception e) { + final ServerWrapper wrapper = createServerWrapper(serverStr); + wrapper.standard = false; + zombieServers.put(serverStr, wrapper); + startAliveCheckExecutor(); + return e; + } + + /** + * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that interval + * + * @param interval time in milliseconds + */ + public void setAliveCheckInterval(final int interval) { + if (interval <= 0) { + throw new IllegalArgumentException("Alive check interval must be " + "positive, specified value = " + interval); + } + this.interval = interval; + } + + private void startAliveCheckExecutor() { + // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor + // if it's not null. + if (aliveCheckExecutor == null) { + synchronized (this) { + if (aliveCheckExecutor == null) { + aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("aliveCheckExecutor")); + aliveCheckExecutor.scheduleAtFixedRate(getAliveCheckRunner(new WeakReference<>(this)), this.interval, this.interval, TimeUnit.MILLISECONDS); + } + } + } + } + + private static Runnable getAliveCheckRunner(final WeakReference<ModifiedLBSolrClient> lbRef) { + return () -> { + final ModifiedLBSolrClient lb = lbRef.get(); + if (lb != null && lb.zombieServers != null) { + for (final Object zombieServer : lb.zombieServers.values()) { + lb.checkAZombieServer((ServerWrapper) zombieServer); + } + } + }; + } + + public ResponseParser getParser() { + return parser; + } + + /** + * Changes the {@link ResponseParser} that will be used for the internal SolrServer objects. + * + * @param parser Default Response Parser chosen to parse the response if the parser were not specified as part of the request. + * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser() + */ + public void setParser(final ResponseParser parser) { + this.parser = parser; + } + + /** + * Changes the {@link RequestWriter} that will be used for the internal SolrServer objects. + * + * @param requestWriter Default RequestWriter, used to encode requests sent to the server. + */ + public void setRequestWriter(final RequestWriter requestWriter) { + this.requestWriter = requestWriter; + } + + public RequestWriter getRequestWriter() { + return requestWriter; + } + + private void checkAZombieServer(final ServerWrapper zombieServer) { + try { + final QueryRequest queryRequest = new QueryRequest(solrQuery); + queryRequest.setBasePath(zombieServer.baseUrl); + final QueryResponse resp = queryRequest.process(getClient(zombieServer.getBaseUrl())); + if (resp.getStatus() == 0) { + // server has come back up. + // make sure to remove from zombies before adding to alive to avoid a race condition + // where another thread could mark it down, move it back to zombie, and then we delete + // from zombie and lose it forever. + final ServerWrapper wrapper = zombieServers.remove(zombieServer.getBaseUrl()); + if (wrapper != null) { + wrapper.failedPings = 0; + if (wrapper.standard) { + addToAlive(wrapper); + } + } else { + // something else already moved the server from zombie to alive + } + } + } catch (final Exception e) { + // Expected. The server is still down. + zombieServer.failedPings++; + + // If the server doesn't belong in the standard set belonging to this load balancer + // then simply drop it after a certain number of failed pings. + if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) { + zombieServers.remove(zombieServer.getBaseUrl()); + } + } + } + + private ServerWrapper removeFromAlive(final String key) { + synchronized (aliveServers) { + final ServerWrapper wrapper = aliveServers.remove(key); + if (wrapper != null) + updateAliveList(); + return wrapper; + } + } + + private void addToAlive(final ServerWrapper wrapper) { + synchronized (aliveServers) { + final ServerWrapper prev = aliveServers.put(wrapper.getBaseUrl(), wrapper); + // TODO: warn if there was a previous entry? + updateAliveList(); + } + } + + public void addSolrServer(final String server) throws MalformedURLException { + addToAlive(createServerWrapper(server)); + } + + public String removeSolrServer(String server) { + try { + server = new URL(server).toExternalForm(); + } catch (final MalformedURLException e) { + throw new RuntimeException(e); + } + if (server.endsWith("/")) { + server = server.substring(0, server.length() - 1); + } + + // there is a small race condition here - if the server is in the process of being moved between + // lists, we could fail to remove it. + removeFromAlive(server); + zombieServers.remove(server); + return null; + } + + /** + * Tries to query a live server. A SolrServerException is thrown if all servers are dead. If the request failed due to IOException then the live server is moved to dead pool and the request is + * retried on another live server. After live servers are exhausted, any servers previously marked as dead will be tried before failing the request. + * + * @param request the SolrRequest. + * @return response + * @throws IOException If there is a low-level I/O error. + */ + @Override + public NamedList<Object> request(final SolrRequest<?> request, final String collection) throws SolrServerException, IOException { + return request(request, collection, null); + } + + public NamedList<Object> request(final SolrRequest<?> request, final String collection, final Integer numServersToTry) throws SolrServerException, IOException { + Exception ex = null; + final ServerWrapper[] serverList = aliveServerList; + + final int maxTries = (numServersToTry == null ? serverList.length : numServersToTry.intValue()); + int numServersTried = 0; + Map<String, ServerWrapper> justFailed = null; + + boolean timeAllowedExceeded = false; + final long timeAllowedNano = getTimeAllowedInNanos(request); + final long timeOutTime = System.nanoTime() + timeAllowedNano; + for (int attempts = 0; attempts < maxTries; attempts++) { + if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) { + break; + } + + final ServerWrapper wrapper = pickServer(serverList, request); + try { + ++numServersTried; + request.setBasePath(wrapper.baseUrl); + return getClient(wrapper.getBaseUrl()).request(request, collection); + } catch (final SolrException e) { + // Server is alive but the request was malformed or invalid + throw e; + } catch (final SolrServerException e) { + if (e.getRootCause() instanceof IOException) { + ex = e; + moveAliveToDead(wrapper); + if (justFailed == null) + justFailed = new HashMap<>(); + justFailed.put(wrapper.getBaseUrl(), wrapper); + } else { + throw e; + } + } catch (final Exception e) { + throw new SolrServerException(e); + } + } + + // try other standard servers that we didn't try just now + for (final ServerWrapper wrapper : zombieServers.values()) { + if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) { + break; + } + + if (wrapper.standard == false || justFailed != null && justFailed.containsKey(wrapper.getBaseUrl())) + continue; + try { + ++numServersTried; + request.setBasePath(wrapper.baseUrl); + final NamedList<Object> rsp = getClient(wrapper.baseUrl).request(request, collection); + // remove from zombie list *before* adding to alive to avoid a race that could lose a server + zombieServers.remove(wrapper.getBaseUrl()); + addToAlive(wrapper); + return rsp; + } catch (final SolrException e) { + // Server is alive but the request was malformed or invalid + throw e; + } catch (final SolrServerException e) { + if (e.getRootCause() instanceof IOException) { + ex = e; + // still dead + } else { + throw e; + } + } catch (final Exception e) { + throw new SolrServerException(e); + } + } + + final String solrServerExceptionMessage; + if (timeAllowedExceeded) { + solrServerExceptionMessage = "Time allowed to handle this request exceeded"; + } else { + if (numServersToTry != null && numServersTried > numServersToTry.intValue()) { + solrServerExceptionMessage = "No live SolrServers available to handle this request:" + " numServersTried=" + numServersTried + " numServersToTry=" + numServersToTry.intValue(); + } else { + solrServerExceptionMessage = "No live SolrServers available to handle this request"; + } + } + if (ex == null) { + throw new SolrServerException(solrServerExceptionMessage); + } else { + throw new SolrServerException(solrServerExceptionMessage, ex); + } + } + + /** + * Pick a server from list to execute request. By default servers are picked in round-robin manner, custom classes can override this method for more advance logic + * + * @param aliveServerList list of currently alive servers + * @param request the request will be sent to the picked server + * @return the picked server + */ + protected ServerWrapper pickServer(final ServerWrapper[] aliveServerList, final SolrRequest<?> request) { + final int count = counter.incrementAndGet() & Integer.MAX_VALUE; + return aliveServerList[count % aliveServerList.length]; + } + + private void moveAliveToDead(ServerWrapper wrapper) { + wrapper = removeFromAlive(wrapper.getBaseUrl()); + if (wrapper == null) + return; // another thread already detected the failure and removed it + zombieServers.put(wrapper.getBaseUrl(), wrapper); + startAliveCheckExecutor(); + } + + @Override + public void close() { + synchronized (this) { + if (aliveCheckExecutor != null) { + aliveCheckExecutor.shutdownNow(); + ExecutorUtil.shutdownAndAwaitTermination(aliveCheckExecutor); + } + } + } + +}
Added: manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java?rev=1909097&view=auto ============================================================================== --- manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java (added) +++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedPreemptiveBasicAuthClientBuilderFactory.java Wed Apr 12 14:35:38 2023 @@ -0,0 +1,137 @@ +package org.apache.manifoldcf.agents.output.solr; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.PreemptiveAuth; +import org.apache.solr.client.solrj.impl.PreemptiveBasicAuthClientBuilderFactory; +import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder; +import org.apache.solr.client.solrj.util.SolrBasicAuthentication; +import org.apache.solr.common.StringUtils; +import org.apache.solr.common.params.MapSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.StrUtils; +import org.eclipse.jetty.client.HttpAuthenticationStore; +import org.eclipse.jetty.client.ProxyAuthenticationProtocolHandler; +import org.eclipse.jetty.client.WWWAuthenticationProtocolHandler; + +public class ModifiedPreemptiveBasicAuthClientBuilderFactory implements ModifiedHttpClientBuilderFactory { + /** + * A system property used to specify a properties file containing default parameters used for creating a HTTP client. This is specifically useful for configuring the HTTP basic auth credentials + * (i.e. username/password). The name of the property must match the relevant Solr config property name. + */ + public static final String SYS_PROP_HTTP_CLIENT_CONFIG = "solr.httpclient.config"; + + /** + * A system property to configure the Basic auth credentials via a java system property. Since this will expose the password on the command-line, it is not very secure. But this mechanism is added + * for backwards compatibility. + */ + public static final String SYS_PROP_BASIC_AUTH_CREDENTIALS = "basicauth"; + + private static PreemptiveAuth requestInterceptor = new PreemptiveAuth(new BasicScheme()); + + private static CredentialsResolver CREDENTIAL_RESOLVER = new CredentialsResolver(); + + /** + * This method enables configuring system wide defaults (apart from using a config file based approach). + */ + public static void setDefaultSolrParams(final SolrParams params) { + CREDENTIAL_RESOLVER.defaultParams = params; + } + + @Override + public void close() throws IOException { + HttpClientUtil.removeRequestInterceptor(requestInterceptor); + } + + @Override + public void setup(final ModifiedHttp2SolrClient client) { + final String basicAuthUser = CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_USER); + final String basicAuthPass = CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_PASS); + this.setup(client, basicAuthUser, basicAuthPass); + } + + public void setup(final ModifiedHttp2SolrClient client, final String basicAuthUser, final String basicAuthPass) { + if (basicAuthUser == null || basicAuthPass == null) { + throw new IllegalArgumentException("username & password must be specified with " + getClass().getName()); + } + + final HttpAuthenticationStore authenticationStore = new HttpAuthenticationStore(); + authenticationStore.addAuthentication(new SolrBasicAuthentication(basicAuthUser, basicAuthPass)); + client.getHttpClient().setAuthenticationStore(authenticationStore); + client.getProtocolHandlers().put(new WWWAuthenticationProtocolHandler(client.getHttpClient())); + client.getProtocolHandlers().put(new ProxyAuthenticationProtocolHandler(client.getHttpClient())); + } + + @Override + public SolrHttpClientBuilder getHttpClientBuilder(final SolrHttpClientBuilder builder) { + final String basicAuthUser = CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_USER); + final String basicAuthPass = CREDENTIAL_RESOLVER.defaultParams.get(HttpClientUtil.PROP_BASIC_AUTH_PASS); + if (basicAuthUser == null || basicAuthPass == null) { + throw new IllegalArgumentException("username & password must be specified with " + getClass().getName()); + } + + return initHttpClientBuilder(builder == null ? SolrHttpClientBuilder.create() : builder, basicAuthUser, basicAuthPass); + } + + private SolrHttpClientBuilder initHttpClientBuilder(final SolrHttpClientBuilder builder, final String basicAuthUser, final String basicAuthPass) { + builder.setDefaultCredentialsProvider(() -> { + final CredentialsProvider credsProvider = new BasicCredentialsProvider(); + credsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(basicAuthUser, basicAuthPass)); + return credsProvider; + }); + + HttpClientUtil.addRequestInterceptor(requestInterceptor); + return builder; + } + + static class CredentialsResolver { + + public volatile SolrParams defaultParams; + + public CredentialsResolver() { + final String credentials = System.getProperty(PreemptiveBasicAuthClientBuilderFactory.SYS_PROP_BASIC_AUTH_CREDENTIALS); + final String configFile = System.getProperty(PreemptiveBasicAuthClientBuilderFactory.SYS_PROP_HTTP_CLIENT_CONFIG); + + if (credentials != null && configFile != null) { + throw new IllegalArgumentException("Basic authentication credentials passed via a configuration file" + " as well as java system property. Please choose one mechanism!"); + } + + if (credentials != null) { + final List<String> ss = StrUtils.splitSmart(credentials, ':'); + if (ss.size() != 2 || StringUtils.isEmpty(ss.get(0)) || StringUtils.isEmpty(ss.get(1))) { + throw new IllegalArgumentException("Invalid Authentication credentials: Please provide 'basicauth' in the 'user:password' format"); + } + final Map<String, String> paramMap = new HashMap<>(); + paramMap.put(HttpClientUtil.PROP_BASIC_AUTH_USER, ss.get(0)); + paramMap.put(HttpClientUtil.PROP_BASIC_AUTH_PASS, ss.get(1)); + defaultParams = new MapSolrParams(paramMap); + } else if (configFile != null) { + final Properties defaultProps = new Properties(); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(configFile), StandardCharsets.UTF_8)) { + defaultProps.load(reader); + } catch (final IOException e) { + throw new IllegalArgumentException("Unable to read credentials file at " + configFile, e); + } + final Map<String, String> map = new HashMap<>(); + defaultProps.forEach((k, v) -> map.put((String) k, (String) v)); + defaultParams = new MapSolrParams(map); + } else { + defaultParams = null; + } + } + } +} Added: manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java?rev=1909097&view=auto ============================================================================== --- manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java (added) +++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedUpdateRequest.java Wed Apr 12 14:35:38 2023 @@ -0,0 +1,520 @@ +package org.apache.manifoldcf.agents.output.solr; + +import static org.apache.solr.common.params.ShardParams._ROUTE_; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.XML; + +public class ModifiedUpdateRequest extends AbstractUpdateRequest { + + public static final String REPFACT = "rf"; + public static final String VER = "ver"; + public static final String OVERWRITE = "ow"; + public static final String COMMIT_WITHIN = "cw"; + private Map<SolrInputDocument, Map<String, Object>> documents = null; + private Iterator<SolrInputDocument> docIterator = null; + private Map<String, Map<String, Object>> deleteById = null; + private List<String> deleteQuery = null; + + private boolean isLastDocInBatch = false; + + public ModifiedUpdateRequest() { + super(METHOD.POST, "/update"); + } + + public ModifiedUpdateRequest(final String url) { + super(METHOD.POST, url); + } + + // --------------------------------------------------------------------------- + // --------------------------------------------------------------------------- + + /** clear the pending documents and delete commands */ + public void clear() { + if (documents != null) { + documents.clear(); + } + if (deleteById != null) { + deleteById.clear(); + } + if (deleteQuery != null) { + deleteQuery.clear(); + } + } + + // --------------------------------------------------------------------------- + // --------------------------------------------------------------------------- + + /** + * Add a SolrInputDocument to this request + * + * @throws NullPointerException if the document is null + */ + public ModifiedUpdateRequest add(final SolrInputDocument doc) { + Objects.requireNonNull(doc, "Cannot add a null SolrInputDocument"); + if (documents == null) { + documents = new LinkedHashMap<>(); + } + documents.put(doc, null); + return this; + } + + public ModifiedUpdateRequest add(final String... fields) { + return add(new SolrInputDocument(fields)); + } + + /** + * Add a SolrInputDocument to this request + * + * @param doc the document + * @param overwrite true if the document should overwrite existing docs with the same id + * @throws NullPointerException if the document is null + */ + public ModifiedUpdateRequest add(final SolrInputDocument doc, final Boolean overwrite) { + return add(doc, null, overwrite); + } + + /** + * Add a SolrInputDocument to this request + * + * @param doc the document + * @param commitWithin the time horizon by which the document should be committed (in ms) + * @throws NullPointerException if the document is null + */ + public ModifiedUpdateRequest add(final SolrInputDocument doc, final Integer commitWithin) { + return add(doc, commitWithin, null); + } + + /** + * Add a SolrInputDocument to this request + * + * @param doc the document + * @param commitWithin the time horizon by which the document should be committed (in ms) + * @param overwrite true if the document should overwrite existing docs with the same id + * @throws NullPointerException if the document is null + */ + public ModifiedUpdateRequest add(final SolrInputDocument doc, final Integer commitWithin, final Boolean overwrite) { + Objects.requireNonNull(doc, "Cannot add a null SolrInputDocument"); + if (documents == null) { + documents = new LinkedHashMap<>(); + } + final Map<String, Object> params = new HashMap<>(2); + if (commitWithin != null) + params.put(COMMIT_WITHIN, commitWithin); + if (overwrite != null) + params.put(OVERWRITE, overwrite); + + documents.put(doc, params); + + return this; + } + + /** + * Add a collection of SolrInputDocuments to this request + * + * @throws NullPointerException if any of the documents in the collection are null + */ + public ModifiedUpdateRequest add(final Collection<SolrInputDocument> docs) { + if (documents == null) { + documents = new LinkedHashMap<>(); + } + for (final SolrInputDocument doc : docs) { + Objects.requireNonNull(doc, "Cannot add a null SolrInputDocument"); + documents.put(doc, null); + } + return this; + } + + public ModifiedUpdateRequest deleteById(final String id) { + if (deleteById == null) { + deleteById = new LinkedHashMap<>(); + } + deleteById.put(id, null); + return this; + } + + public ModifiedUpdateRequest deleteById(final String id, final String route) { + return deleteById(id, route, null); + } + + public ModifiedUpdateRequest deleteById(final String id, final String route, final Long version) { + if (deleteById == null) { + deleteById = new LinkedHashMap<>(); + } + final Map<String, Object> params = (route == null && version == null) ? null : new HashMap<>(1); + if (version != null) + params.put(VER, version); + if (route != null) + params.put(_ROUTE_, route); + deleteById.put(id, params); + return this; + } + + public ModifiedUpdateRequest deleteById(final List<String> ids) { + if (deleteById == null) { + deleteById = new LinkedHashMap<>(); + } + + for (final String id : ids) { + deleteById.put(id, null); + } + + return this; + } + + public ModifiedUpdateRequest deleteById(final String id, final Long version) { + return deleteById(id, null, version); + } + + public ModifiedUpdateRequest deleteByQuery(final String q) { + if (deleteQuery == null) { + deleteQuery = new ArrayList<>(); + } + deleteQuery.add(q); + return this; + } + + public ModifiedUpdateRequest withRoute(final String route) { + if (params == null) + params = new ModifiableSolrParams(); + params.set(_ROUTE_, route); + return this; + } + + public UpdateResponse commit(final SolrClient client, final String collection) throws IOException, SolrServerException { + if (params == null) + params = new ModifiableSolrParams(); + params.set(UpdateParams.COMMIT, "true"); + return process(client, collection); + } + + private interface ReqSupplier<T extends ModifiedLBSolrClient.Req> { + T get(ModifiedUpdateRequest request, List<String> servers); + } + + private <T extends ModifiedLBSolrClient.Req> Map<String, T> getRoutes(final DocRouter router, final DocCollection col, final Map<String, List<String>> urlMap, final ModifiableSolrParams params, + final String idField, final ReqSupplier<T> reqSupplier) { + if ((documents == null || documents.size() == 0) && (deleteById == null || deleteById.size() == 0)) { + return null; + } + + final Map<String, T> routes = new HashMap<>(); + if (documents != null) { + final Set<Entry<SolrInputDocument, Map<String, Object>>> entries = documents.entrySet(); + for (final Entry<SolrInputDocument, Map<String, Object>> entry : entries) { + final SolrInputDocument doc = entry.getKey(); + final Object id = doc.getFieldValue(idField); + if (id == null) { + return null; + } + final Slice slice = router.getTargetSlice(id.toString(), doc, null, null, col); + if (slice == null) { + return null; + } + final List<String> urls = urlMap.get(slice.getName()); + if (urls == null) { + return null; + } + final String leaderUrl = urls.get(0); + T request = routes.get(leaderUrl); + if (request == null) { + final ModifiedUpdateRequest updateRequest = new ModifiedUpdateRequest(); + updateRequest.setMethod(getMethod()); + updateRequest.setCommitWithin(getCommitWithin()); + updateRequest.setParams(params); + updateRequest.setPath(getPath()); + updateRequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword()); + updateRequest.setResponseParser(getResponseParser()); + request = reqSupplier.get(updateRequest, urls); + routes.put(leaderUrl, request); + } + final ModifiedUpdateRequest urequest = (ModifiedUpdateRequest) request.getRequest(); + final Map<String, Object> value = entry.getValue(); + Boolean ow = null; + if (value != null) { + ow = (Boolean) value.get(OVERWRITE); + } + if (ow != null) { + urequest.add(doc, ow); + } else { + urequest.add(doc); + } + } + } + + // Route the deleteById's + + if (deleteById != null) { + + final Iterator<Map.Entry<String, Map<String, Object>>> entries = deleteById.entrySet().iterator(); + while (entries.hasNext()) { + + final Map.Entry<String, Map<String, Object>> entry = entries.next(); + + final String deleteId = entry.getKey(); + final Map<String, Object> map = entry.getValue(); + Long version = null; + String route = null; + if (map != null) { + version = (Long) map.get(VER); + route = (String) map.get(_ROUTE_); + } + final Slice slice = router.getTargetSlice(deleteId, null, route, null, col); + if (slice == null) { + return null; + } + final List<String> urls = urlMap.get(slice.getName()); + if (urls == null) { + return null; + } + final String leaderUrl = urls.get(0); + T request = routes.get(leaderUrl); + if (request != null) { + final ModifiedUpdateRequest urequest = (ModifiedUpdateRequest) request.getRequest(); + urequest.deleteById(deleteId, route, version); + } else { + final ModifiedUpdateRequest urequest = new ModifiedUpdateRequest(); + urequest.setParams(params); + urequest.deleteById(deleteId, route, version); + urequest.setCommitWithin(getCommitWithin()); + urequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword()); + request = reqSupplier.get(urequest, urls); + routes.put(leaderUrl, request); + } + } + } + + return routes; + } + + /** + * @param router to route updates with + * @param col DocCollection for the updates + * @param urlMap of the cluster + * @param params params to use + * @param idField the id field + * @return a Map of urls to requests + */ + public Map<String, ModifiedLBSolrClient.Req> getRoutesToCollection(final DocRouter router, final DocCollection col, final Map<String, List<String>> urlMap, final ModifiableSolrParams params, + final String idField) { + return getRoutes(router, col, urlMap, params, idField, ModifiedLBSolrClient.Req::new); + } + + public void setDocIterator(final Iterator<SolrInputDocument> docIterator) { + this.docIterator = docIterator; + } + + public void setDeleteQuery(final List<String> deleteQuery) { + this.deleteQuery = deleteQuery; + } + + // -------------------------------------------------------------------------- + // -------------------------------------------------------------------------- + + @Override + public Collection<ContentStream> getContentStreams() throws IOException { + return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML); + } + + public String getXML() throws IOException { + final StringWriter writer = new StringWriter(); + writeXML(writer); + writer.flush(); + + // If action is COMMIT or OPTIMIZE, it is sent with params + final String xml = writer.toString(); + // System.out.println( "SEND:"+xml ); + return (xml.length() > 0) ? xml : null; + } + + private List<Map<SolrInputDocument, Map<String, Object>>> getDocLists(final Map<SolrInputDocument, Map<String, Object>> documents) { + final List<Map<SolrInputDocument, Map<String, Object>>> docLists = new ArrayList<>(); + Map<SolrInputDocument, Map<String, Object>> docList = null; + if (this.documents != null) { + + Boolean lastOverwrite = true; + Integer lastCommitWithin = -1; + + final Set<Entry<SolrInputDocument, Map<String, Object>>> entries = this.documents.entrySet(); + for (final Entry<SolrInputDocument, Map<String, Object>> entry : entries) { + final Map<String, Object> map = entry.getValue(); + Boolean overwrite = null; + Integer commitWithin = null; + if (map != null) { + overwrite = (Boolean) entry.getValue().get(OVERWRITE); + commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN); + } + if (!Objects.equals(overwrite, lastOverwrite) || !Objects.equals(commitWithin, lastCommitWithin) || docLists.isEmpty()) { + docList = new LinkedHashMap<>(); + docLists.add(docList); + } + docList.put(entry.getKey(), entry.getValue()); + lastCommitWithin = commitWithin; + lastOverwrite = overwrite; + } + } + + if (docIterator != null) { + docList = new LinkedHashMap<>(); + docLists.add(docList); + while (docIterator.hasNext()) { + final SolrInputDocument doc = docIterator.next(); + if (doc != null) { + docList.put(doc, null); + } + } + } + + return docLists; + } + + /** + * @since solr 1.4 + */ + public ModifiedUpdateRequest writeXML(final Writer writer) throws IOException { + final List<Map<SolrInputDocument, Map<String, Object>>> getDocLists = getDocLists(documents); + + for (final Map<SolrInputDocument, Map<String, Object>> docs : getDocLists) { + + if ((docs != null && docs.size() > 0)) { + final Entry<SolrInputDocument, Map<String, Object>> firstDoc = docs.entrySet().iterator().next(); + final Map<String, Object> map = firstDoc.getValue(); + Integer cw = null; + Boolean ow = null; + if (map != null) { + cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN); + ow = (Boolean) firstDoc.getValue().get(OVERWRITE); + } + if (ow == null) + ow = true; + final int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin; + final boolean overwrite = ow; + if (commitWithin > -1 || overwrite != true) { + writer.write("<add commitWithin=\"" + commitWithin + "\" " + "overwrite=\"" + overwrite + "\">"); + } else { + writer.write("<add>"); + } + + final Set<Entry<SolrInputDocument, Map<String, Object>>> entries = docs.entrySet(); + for (final Entry<SolrInputDocument, Map<String, Object>> entry : entries) { + ClientUtils.writeXML(entry.getKey(), writer); + } + + writer.write("</add>"); + } + } + + // Add the delete commands + final boolean deleteI = deleteById != null && deleteById.size() > 0; + final boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0; + if (deleteI || deleteQ) { + if (commitWithin > 0) { + writer.append("<delete commitWithin=\"").append(String.valueOf(commitWithin)).append("\">"); + } else { + writer.append("<delete>"); + } + if (deleteI) { + for (final Map.Entry<String, Map<String, Object>> entry : deleteById.entrySet()) { + writer.append("<id"); + final Map<String, Object> map = entry.getValue(); + if (map != null) { + final Long version = (Long) map.get(VER); + final String route = (String) map.get(_ROUTE_); + if (version != null) { + writer.append(" version=\"").append(String.valueOf(version)).append('"'); + } + + if (route != null) { + writer.append(" _route_=\"").append(route).append('"'); + } + } + writer.append(">"); + + XML.escapeCharData(entry.getKey(), writer); + writer.append("</id>"); + } + } + if (deleteQ) { + for (final String q : deleteQuery) { + writer.append("<query>"); + XML.escapeCharData(q, writer); + writer.append("</query>"); + } + } + writer.append("</delete>"); + } + return this; + } + + // -------------------------------------------------------------------------- + // -------------------------------------------------------------------------- + + // -------------------------------------------------------------------------- + // + // -------------------------------------------------------------------------- + + public List<SolrInputDocument> getDocuments() { + if (documents == null) + return null; + final List<SolrInputDocument> docs = new ArrayList<>(documents.size()); + docs.addAll(documents.keySet()); + return docs; + } + + public Map<SolrInputDocument, Map<String, Object>> getDocumentsMap() { + return documents; + } + + public Iterator<SolrInputDocument> getDocIterator() { + return docIterator; + } + + public List<String> getDeleteById() { + if (deleteById == null) + return null; + final List<String> deletes = new ArrayList<>(deleteById.keySet()); + return deletes; + } + + public Map<String, Map<String, Object>> getDeleteByIdMap() { + return deleteById; + } + + public List<String> getDeleteQuery() { + return deleteQuery; + } + + public boolean isLastDocInBatch() { + return isLastDocInBatch; + } + + public void lastDocInBatch() { + isLastDocInBatch = true; + } + +} Modified: manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml?rev=1909097&r1=1909096&r2=1909097&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml (original) +++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/pom.xml Wed Apr 12 14:35:38 2023 @@ -244,6 +244,24 @@ <version>${slf4j.version}</version> </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-client</artifactId> + <version>${jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-http-client-transport</artifactId> + <version>${jetty.version}</version> + </dependency> + + + + + + + + <!-- Testing dependencies --> <dependency> @@ -325,7 +343,6 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> @@ -343,7 +360,6 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> <version>${jetty.version}</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> Modified: manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java?rev=1909097&r1=1909096&r2=1909097&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java (original) +++ manifoldcf/branches/CONNECTORS-1740/framework/core/src/test/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperInstance.java Wed Apr 12 14:35:38 2023 @@ -18,31 +18,31 @@ */ package org.apache.manifoldcf.core.lockmanager; -import java.util.*; -import java.io.*; -import org.apache.zookeeper.server.*; -import org.apache.zookeeper.server.quorum.*; +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -public class ZooKeeperInstance -{ +public class ZooKeeperInstance { protected final int zkPort; protected final File tempDir; - + protected ZooKeeperThread zookeeperThread = null; - - public ZooKeeperInstance(int zkPort, File tempDir) - { + + public ZooKeeperInstance(final int zkPort, final File tempDir) { this.zkPort = zkPort; this.tempDir = tempDir; } - public void start() - throws Exception - { - Properties startupProperties = new Properties(); - startupProperties.setProperty("tickTime","2000"); - startupProperties.setProperty("dataDir",tempDir.toString()); - startupProperties.setProperty("clientPort",Integer.toString(zkPort)); + public void start() throws Exception { + final Properties startupProperties = new Properties(); + startupProperties.setProperty("tickTime", "2000"); + startupProperties.setProperty("dataDir", tempDir.toString()); + startupProperties.setProperty("clientPort", Integer.toString(zkPort)); + startupProperties.setProperty("admin.enableServer", "false"); final QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); quorumConfiguration.parseProperties(startupProperties); @@ -53,33 +53,26 @@ public class ZooKeeperInstance zookeeperThread = new ZooKeeperThread(configuration); zookeeperThread.start(); // We have no way of knowing whether zookeeper is alive or not, but the - // client is supposed to know about that. But it doesn't, so wait for 5 seconds + // client is supposed to know about that. But it doesn't, so wait for 5 seconds Thread.sleep(5000L); } - - public void stop() - throws Exception - { - while (true) - { + + public void stop() throws Exception { + while (true) { if (zookeeperThread == null) break; - else if (!zookeeperThread.isAlive()) - { - Throwable e = zookeeperThread.finishUp(); - if (e != null) - { + else if (!zookeeperThread.isAlive()) { + final Throwable e = zookeeperThread.finishUp(); + if (e != null) { if (e instanceof RuntimeException) - throw (RuntimeException)e; + throw (RuntimeException) e; else if (e instanceof Exception) - throw (Exception)e; + throw (Exception) e; else if (e instanceof Error) - throw (Error)e; + throw (Error) e; } zookeeperThread = null; - } - else - { + } else { // This isn't the best way to kill zookeeper but it's the only way // we've got. zookeeperThread.interrupt(); @@ -87,42 +80,33 @@ public class ZooKeeperInstance } } } - - protected static class ZooKeeperThread extends Thread - { + + protected static class ZooKeeperThread extends Thread { protected final ServerConfig config; - + protected Throwable exception = null; - - public ZooKeeperThread(ServerConfig config) - { + + public ZooKeeperThread(final ServerConfig config) { this.config = config; } - - public void run() - { - try - { - ZooKeeperServerMain server = new ZooKeeperServerMain(); + + @Override + public void run() { + try { + final ZooKeeperServerMain server = new ZooKeeperServerMain(); server.runFromConfig(config); - } - catch (IOException e) - { + } catch (final IOException e) { // Ignore IOExceptions, since that seems to be normal when shutting // down zookeeper via thread.interrupt() - } - catch (Throwable e) - { + } catch (final Throwable e) { exception = e; } } - - public Throwable finishUp() - throws InterruptedException - { + + public Throwable finishUp() throws InterruptedException { join(); return exception; } } - + }
