http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java new file mode 100644 index 0000000..8272813 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -0,0 +1,1232 @@ +package org.apache.solr.client.solrj.impl; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpClient; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.solr.client.solrj.ResponseParser; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; +import org.apache.solr.client.solrj.request.IsUpdateRequest; +import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.Aliases; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.ImplicitDocRouter; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.Hash; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.apache.solr.common.util.StrUtils; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * SolrJ client class to communicate with SolrCloud. + * Instances of this class communicate with Zookeeper to discover + * Solr endpoints for SolrCloud collections, and then use the + * {@link LBHttpSolrClient} to issue requests. + * + * This class assumes the id field for your documents is called + * 'id' - if this is not the case, you must set the right name + * with {@link #setIdField(String)}. + */ +@SuppressWarnings("serial") +public class CloudSolrClient extends SolrClient { + protected static final Logger log = LoggerFactory.getLogger(CloudSolrClient.class); + + private volatile ZkStateReader zkStateReader; + private String zkHost; // the zk server connect string + private int zkConnectTimeout = 10000; + private int zkClientTimeout = 10000; + private volatile String defaultCollection; + private final LBHttpSolrClient lbClient; + private final boolean shutdownLBHttpSolrServer; + private HttpClient myClient; + private final boolean clientIsInternal; + //no of times collection state to be reloaded if stale state error is received + private static final int MAX_STALE_RETRIES = 5; + Random rand = new Random(); + + private final boolean updatesToLeaders; + private boolean parallelUpdates = true; + private ExecutorService threadPool = Executors + .newCachedThreadPool(new SolrjNamedThreadFactory( + "CloudSolrServer ThreadPool")); + private String idField = "id"; + public static final String STATE_VERSION = "_stateVer_"; + private final Set<String> NON_ROUTABLE_PARAMS; + { + NON_ROUTABLE_PARAMS = new HashSet<>(); + NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES); + NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS); + NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT); + NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER); + NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER); + + NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT); + NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT); + NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE); + + // Not supported via SolrCloud + // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK); + + } + private volatile long timeToLive = 60* 1000L; + private volatile List<Object> locks = objectList(3); + + + protected final Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){ + @Override + public ExpiringCachedDocCollection get(Object key) { + ExpiringCachedDocCollection val = super.get(key); + if(val == null) return null; + if(val.isExpired(timeToLive)) { + super.remove(key); + return null; + } + return val; + } + + }; + + class ExpiringCachedDocCollection { + final DocCollection cached; + long cachedAt; + + ExpiringCachedDocCollection(DocCollection cached) { + this.cached = cached; + this.cachedAt = System.currentTimeMillis(); + } + + boolean isExpired(long timeToLive) { + return (System.currentTimeMillis() - cachedAt) > timeToLive; + } + } + + /** + * Create a new client object that connects to Zookeeper and is always aware + * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and + * SolrCloud has enough replicas for every shard in a collection, there is no + * single point of failure. Updates will be sent to shard leaders by default. + * + * @param zkHost + * The client endpoint of the zookeeper quorum containing the cloud + * state. The full specification for this string is one or more comma + * separated HOST:PORT values, followed by an optional chroot value + * that starts with a forward slash. Using a chroot allows multiple + * applications to coexist in one ensemble. For full details, see the + * Zookeeper documentation. Some examples: + * <p> + * "host1:2181" + * <p> + * "host1:2181,host2:2181,host3:2181/mysolrchroot" + * <p> + * "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181" + */ + public CloudSolrClient(String zkHost) { + this.zkHost = zkHost; + this.clientIsInternal = true; + this.myClient = HttpClientUtil.createClient(null); + this.lbClient = new LBHttpSolrClient(myClient); + this.lbClient.setRequestWriter(new BinaryRequestWriter()); + this.lbClient.setParser(new BinaryResponseParser()); + this.updatesToLeaders = true; + shutdownLBHttpSolrServer = true; + lbClient.addQueryParams(STATE_VERSION); + } + + /** + * Create a new client object that connects to Zookeeper and is always aware + * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and + * SolrCloud has enough replicas for every shard in a collection, there is no + * single point of failure. Updates will be sent to shard leaders by default. + * + * @param zkHost + * The client endpoint of the zookeeper quorum containing the cloud + * state. The full specification for this string is one or more comma + * separated HOST:PORT values, followed by an optional chroot value + * that starts with a forward slash. Using a chroot allows multiple + * applications to coexist in one ensemble. For full details, see the + * Zookeeper documentation. Some examples: + * <p> + * "host1:2181" + * <p> + * "host1:2181,host2:2181,host3:2181/mysolrchroot" + * <p> + * "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181" + * @param httpClient + * the {@link HttpClient} instance to be used for all requests. The + * provided httpClient should use a multi-threaded connection manager. + */ + public CloudSolrClient(String zkHost, HttpClient httpClient) { + this.zkHost = zkHost; + this.clientIsInternal = httpClient == null; + this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; + this.lbClient = new LBHttpSolrClient(myClient); + this.lbClient.setRequestWriter(new BinaryRequestWriter()); + this.lbClient.setParser(new BinaryResponseParser()); + this.updatesToLeaders = true; + shutdownLBHttpSolrServer = true; + lbClient.addQueryParams(STATE_VERSION); + } + + /** + * Create a new client object using multiple string values in a Collection + * instead of a standard zkHost connection string. Note that this method will + * not be used if there is only one String argument - that will use + * {@link #CloudSolrClient(String)} instead. + * + * @param zkHosts + * A Java Collection (List, Set, etc) of HOST:PORT strings, one for + * each host in the zookeeper ensemble. Note that with certain + * Collection types like HashSet, the order of hosts in the final + * connect string may not be in the same order you added them. + * @param chroot + * A chroot value for zookeeper, starting with a forward slash. If no + * chroot is required, use null. + * @throws IllegalArgumentException + * if the chroot value does not start with a forward slash. + * @see #CloudSolrClient(String) + */ + public CloudSolrClient(Collection<String> zkHosts, String chroot) { + this(zkHosts, chroot, null); + } + + /** + * Create a new client object using multiple string values in a Collection + * instead of a standard zkHost connection string. Note that this method will + * not be used if there is only one String argument - that will use + * {@link #CloudSolrClient(String)} instead. + * + * @param zkHosts + * A Java Collection (List, Set, etc) of HOST:PORT strings, one for + * each host in the zookeeper ensemble. Note that with certain + * Collection types like HashSet, the order of hosts in the final + * connect string may not be in the same order you added them. + * @param chroot + * A chroot value for zookeeper, starting with a forward slash. If no + * chroot is required, use null. + * @param httpClient + * the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a + * multi-threaded connection manager. + * @throws IllegalArgumentException + * if the chroot value does not start with a forward slash. + * @see #CloudSolrClient(String) + */ + public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient) { + StringBuilder zkBuilder = new StringBuilder(); + int lastIndexValue = zkHosts.size() - 1; + int i = 0; + for (String zkHost : zkHosts) { + zkBuilder.append(zkHost); + if (i < lastIndexValue) { + zkBuilder.append(","); + } + i++; + } + if (chroot != null) { + if (chroot.startsWith("/")) { + zkBuilder.append(chroot); + } else { + throw new IllegalArgumentException( + "The chroot must start with a forward slash."); + } + } + + /* Log the constructed connection string and then initialize. */ + log.info("Final constructed zkHost string: " + zkBuilder.toString()); + + this.zkHost = zkBuilder.toString(); + this.clientIsInternal = httpClient == null; + this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; + this.lbClient = new LBHttpSolrClient(myClient); + this.lbClient.setRequestWriter(new BinaryRequestWriter()); + this.lbClient.setParser(new BinaryResponseParser()); + this.updatesToLeaders = true; + shutdownLBHttpSolrServer = true; + } + + /** + * @param zkHost + * A zookeeper client endpoint. + * @param updatesToLeaders + * If true, sends updates only to shard leaders. + * @see #CloudSolrClient(String) for full description and details on zkHost + */ + public CloudSolrClient(String zkHost, boolean updatesToLeaders) { + this(zkHost, updatesToLeaders, null); + } + + /** + * @param zkHost + * A zookeeper client endpoint. + * @param updatesToLeaders + * If true, sends updates only to shard leaders. + * @param httpClient + * the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a + * multi-threaded connection manager. + * @see #CloudSolrClient(String) for full description and details on zkHost + */ + public CloudSolrClient(String zkHost, boolean updatesToLeaders, HttpClient httpClient) { + this.zkHost = zkHost; + this.clientIsInternal = httpClient == null; + this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; + this.lbClient = new LBHttpSolrClient(myClient); + this.lbClient.setRequestWriter(new BinaryRequestWriter()); + this.lbClient.setParser(new BinaryResponseParser()); + this.updatesToLeaders = updatesToLeaders; + shutdownLBHttpSolrServer = true; + lbClient.addQueryParams(STATE_VERSION); + } + + /**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json + * @param seconds ttl value in seconds + */ + public void setCollectionCacheTTl(int seconds){ + assert seconds > 0; + timeToLive = seconds*1000L; + } + + /** + * @param zkHost + * A zookeeper client endpoint. + * @param lbClient + * LBHttpSolrServer instance for requests. + * @see #CloudSolrClient(String) for full description and details on zkHost + */ + public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient) { + this(zkHost, lbClient, true); + } + + /** + * @param zkHost + * A zookeeper client endpoint. + * @param lbClient + * LBHttpSolrServer instance for requests. + * @param updatesToLeaders + * If true, sends updates only to shard leaders. + * @see #CloudSolrClient(String) for full description and details on zkHost + */ + public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) { + this.zkHost = zkHost; + this.lbClient = lbClient; + this.updatesToLeaders = updatesToLeaders; + shutdownLBHttpSolrServer = false; + this.clientIsInternal = false; + lbClient.addQueryParams(STATE_VERSION); + } + + public ResponseParser getParser() { + return lbClient.getParser(); + } + + /** + * Note: This setter method is <b>not thread-safe</b>. + * + * @param processor + * Default Response Parser chosen to parse the response if the parser + * were not specified as part of the request. + * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser() + */ + public void setParser(ResponseParser processor) { + lbClient.setParser(processor); + } + + public RequestWriter getRequestWriter() { + return lbClient.getRequestWriter(); + } + + public void setRequestWriter(RequestWriter requestWriter) { + lbClient.setRequestWriter(requestWriter); + } + + /** + * @return the zkHost value used to connect to zookeeper. + */ + public String getZkHost() { + return zkHost; + } + + public ZkStateReader getZkStateReader() { + return zkStateReader; + } + + /** + * @param idField the field to route documents on. + */ + public void setIdField(String idField) { + this.idField = idField; + } + + /** + * @return the field that updates are routed on. + */ + public String getIdField() { + return idField; + } + + /** Sets the default collection for request */ + public void setDefaultCollection(String collection) { + this.defaultCollection = collection; + } + + /** Gets the default collection for request */ + public String getDefaultCollection() { + return defaultCollection; + } + + /** Set the connect timeout to the zookeeper ensemble in ms */ + public void setZkConnectTimeout(int zkConnectTimeout) { + this.zkConnectTimeout = zkConnectTimeout; + } + + /** Set the timeout to the zookeeper ensemble in ms */ + public void setZkClientTimeout(int zkClientTimeout) { + this.zkClientTimeout = zkClientTimeout; + } + + /** + * Connect to the zookeeper ensemble. + * This is an optional method that may be used to force a connect before any other requests are sent. + * + */ + public void connect() { + if (zkStateReader == null) { + synchronized (this) { + if (zkStateReader == null) { + ZkStateReader zk = null; + try { + zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout); + zk.createClusterStateWatchersAndUpdate(); + zkStateReader = zk; + } catch (InterruptedException e) { + zk.close(); + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (KeeperException e) { + zk.close(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (Exception e) { + if (zk != null) zk.close(); + // do not wrap because clients may be relying on the underlying exception being thrown + throw e; + } + } + } + } + } + + public void setParallelUpdates(boolean parallelUpdates) { + this.parallelUpdates = parallelUpdates; + } + + /** + * Upload a set of config files to Zookeeper and give it a name + * + * NOTE: You should only allow trusted users to upload configs. If you + * are allowing client access to zookeeper, you should protect the + * /configs node against unauthorised write access. + * + * @param configPath {@link java.nio.file.Path} to the config files + * @param configName the name of the config + * @throws IOException if an IO error occurs + */ + public void uploadConfig(Path configPath, String configName) throws IOException { + zkStateReader.getConfigManager().uploadConfigDir(configPath, configName); + } + + /** + * Download a named config from Zookeeper to a location on the filesystem + * @param configName the name of the config + * @param downloadPath the path to write config files to + * @throws IOException if an I/O exception occurs + */ + public void downloadConfig(String configName, Path downloadPath) throws IOException { + zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath); + } + + private NamedList<Object> directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException { + UpdateRequest updateRequest = (UpdateRequest) request; + ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); + ModifiableSolrParams routableParams = new ModifiableSolrParams(); + ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams(); + + if(params != null) { + nonRoutableParams.add(params); + routableParams.add(params); + for(String param : NON_ROUTABLE_PARAMS) { + routableParams.remove(param); + } + } + + String collection = nonRoutableParams.get(UpdateParams.COLLECTION, defaultCollection); + if (collection == null) { + throw new SolrServerException("No collection param specified on request and no default collection has been set."); + } + + + //Check to see if the collection is an alias. + Aliases aliases = zkStateReader.getAliases(); + if(aliases != null) { + Map<String, String> collectionAliases = aliases.getCollectionAliasMap(); + if(collectionAliases != null && collectionAliases.containsKey(collection)) { + collection = collectionAliases.get(collection); + } + } + + DocCollection col = getDocCollection(clusterState, collection,null); + + DocRouter router = col.getRouter(); + + if (router instanceof ImplicitDocRouter) { + // short circuit as optimization + return null; + } + + //Create the URL map, which is keyed on slice name. + //The value is a list of URLs for each replica in the slice. + //The first value in the list is the leader for the slice. + Map<String,List<String>> urlMap = buildUrlMap(col); + if (urlMap == null) { + // we could not find a leader yet - use unoptimized general path + return null; + } + + NamedList<Throwable> exceptions = new NamedList<>(); + NamedList<NamedList> shardResponses = new NamedList<>(); + + Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField); + if (routes == null) { + return null; + } + + long start = System.nanoTime(); + + if (parallelUpdates) { + final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); + for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBHttpSolrClient.Req lbRequest = entry.getValue(); + responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() { + @Override + public NamedList<?> call() throws Exception { + return lbClient.request(lbRequest).getResponse(); + } + })); + } + + for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) { + final String url = entry.getKey(); + final Future<NamedList<?>> responseFuture = entry.getValue(); + try { + shardResponses.add(url, responseFuture.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + exceptions.add(url, e.getCause()); + } + } + + if (exceptions.size() > 0) { + throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes); + } + } else { + for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) { + String url = entry.getKey(); + LBHttpSolrClient.Req lbRequest = entry.getValue(); + try { + NamedList<Object> rsp = lbClient.request(lbRequest).getResponse(); + shardResponses.add(url, rsp); + } catch (Exception e) { + throw new SolrServerException(e); + } + } + } + + UpdateRequest nonRoutableRequest = null; + List<String> deleteQuery = updateRequest.getDeleteQuery(); + if (deleteQuery != null && deleteQuery.size() > 0) { + UpdateRequest deleteQueryRequest = new UpdateRequest(); + deleteQueryRequest.setDeleteQuery(deleteQuery); + nonRoutableRequest = deleteQueryRequest; + } + + Set<String> paramNames = nonRoutableParams.getParameterNames(); + + Set<String> intersection = new HashSet<>(paramNames); + intersection.retainAll(NON_ROUTABLE_PARAMS); + + if (nonRoutableRequest != null || intersection.size() > 0) { + if (nonRoutableRequest == null) { + nonRoutableRequest = new UpdateRequest(); + } + nonRoutableRequest.setParams(nonRoutableParams); + List<String> urlList = new ArrayList<>(); + urlList.addAll(routes.keySet()); + Collections.shuffle(urlList, rand); + LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList); + try { + LBHttpSolrClient.Rsp rsp = lbClient.request(req); + shardResponses.add(urlList.get(0), rsp.getResponse()); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e); + } + } + + long end = System.nanoTime(); + + RouteResponse rr = condenseResponse(shardResponses, (long)((end - start)/1000000)); + rr.setRouteResponses(shardResponses); + rr.setRoutes(routes); + return rr; + } + + private Map<String,List<String>> buildUrlMap(DocCollection col) { + Map<String, List<String>> urlMap = new HashMap<>(); + Collection<Slice> slices = col.getActiveSlices(); + Iterator<Slice> sliceIterator = slices.iterator(); + while (sliceIterator.hasNext()) { + Slice slice = sliceIterator.next(); + String name = slice.getName(); + List<String> urls = new ArrayList<>(); + Replica leader = slice.getLeader(); + if (leader == null) { + // take unoptimized general path - we cannot find a leader yet + return null; + } + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); + String url = zkProps.getCoreUrl(); + urls.add(url); + Collection<Replica> replicas = slice.getReplicas(); + Iterator<Replica> replicaIterator = replicas.iterator(); + while (replicaIterator.hasNext()) { + Replica replica = replicaIterator.next(); + if (!replica.getNodeName().equals(leader.getNodeName()) && + !replica.getName().equals(leader.getName())) { + ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica); + String url1 = zkProps1.getCoreUrl(); + urls.add(url1); + } + } + urlMap.put(name, urls); + } + return urlMap; + } + + public RouteResponse condenseResponse(NamedList response, long timeMillis) { + RouteResponse condensed = new RouteResponse(); + int status = 0; + Integer rf = null; + Integer minRf = null; + for(int i=0; i<response.size(); i++) { + NamedList shardResponse = (NamedList)response.getVal(i); + NamedList header = (NamedList)shardResponse.get("responseHeader"); + Integer shardStatus = (Integer)header.get("status"); + int s = shardStatus.intValue(); + if(s > 0) { + status = s; + } + Object rfObj = header.get(UpdateRequest.REPFACT); + if (rfObj != null && rfObj instanceof Integer) { + Integer routeRf = (Integer)rfObj; + if (rf == null || routeRf < rf) + rf = routeRf; + } + minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT); + } + + NamedList cheader = new NamedList(); + cheader.add("status", status); + cheader.add("QTime", timeMillis); + if (rf != null) + cheader.add(UpdateRequest.REPFACT, rf); + if (minRf != null) + cheader.add(UpdateRequest.MIN_REPFACT, minRf); + + condensed.add("responseHeader", cheader); + return condensed; + } + + public static class RouteResponse extends NamedList { + private NamedList routeResponses; + private Map<String, LBHttpSolrClient.Req> routes; + + public void setRouteResponses(NamedList routeResponses) { + this.routeResponses = routeResponses; + } + + public NamedList getRouteResponses() { + return routeResponses; + } + + public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) { + this.routes = routes; + } + + public Map<String, LBHttpSolrClient.Req> getRoutes() { + return routes; + } + + } + + public static class RouteException extends SolrException { + + private NamedList<Throwable> throwables; + private Map<String, LBHttpSolrClient.Req> routes; + + public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){ + super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0)); + this.throwables = throwables; + this.routes = routes; + } + + public NamedList<Throwable> getThrowables() { + return throwables; + } + + public Map<String, LBHttpSolrClient.Req> getRoutes() { + return this.routes; + } + } + + @Override + public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException { + SolrParams reqParams = request.getParams(); + String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection(); + return requestWithRetryOnStaleState(request, 0, collection); + } + + /** + * As this class doesn't watch external collections on the client side, + * there's a chance that the request will fail due to cached stale state, + * which means the state must be refreshed from ZK and retried. + */ + protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection) + throws SolrServerException, IOException { + + connect(); // important to call this before you start working with the ZkStateReader + + // build up a _stateVer_ param to pass to the server containing all of the + // external collection state versions involved in this request, which allows + // the server to notify us that our cached state for one or more of the external + // collections is stale and needs to be refreshed ... this code has no impact on internal collections + String stateVerParam = null; + List<DocCollection> requestedCollections = null; + if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests + Set<String> requestedCollectionNames = getCollectionNames(getZkStateReader().getClusterState(), collection); + + StringBuilder stateVerParamBuilder = null; + for (String requestedCollection : requestedCollectionNames) { + // track the version of state we're using on the client side using the _stateVer_ param + DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null); + int collVer = coll.getZNodeVersion(); + if (coll.getStateFormat()>1) { + if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size()); + requestedCollections.add(coll); + + if (stateVerParamBuilder == null) { + stateVerParamBuilder = new StringBuilder(); + } else { + stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name + } + + stateVerParamBuilder.append(coll.getName()).append(":").append(collVer); + } + } + + if (stateVerParamBuilder != null) { + stateVerParam = stateVerParamBuilder.toString(); + } + } + + if (request.getParams() instanceof ModifiableSolrParams) { + ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); + if (stateVerParam != null) { + params.set(STATE_VERSION, stateVerParam); + } else { + params.remove(STATE_VERSION); + } + } // else: ??? how to set this ??? + + NamedList<Object> resp = null; + try { + resp = sendRequest(request); + //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there + Object o = resp.get(STATE_VERSION, resp.size()-1); + if(o != null && o instanceof Map) { + //remove this because no one else needs this and tests would fail if they are comparing responses + resp.remove(resp.size()-1); + Map invalidStates = (Map) o; + for (Object invalidEntries : invalidStates.entrySet()) { + Map.Entry e = (Map.Entry) invalidEntries; + getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue()); + } + + } + } catch (Exception exc) { + + Throwable rootCause = SolrException.getRootCause(exc); + // don't do retry support for admin requests or if the request doesn't have a collection specified + if (collection == null || request.getPath().startsWith("/admin")) { + if (exc instanceof SolrServerException) { + throw (SolrServerException)exc; + } else if (exc instanceof IOException) { + throw (IOException)exc; + }else if (exc instanceof RuntimeException) { + throw (RuntimeException) exc; + } + else { + throw new SolrServerException(rootCause); + } + } + + int errorCode = (rootCause instanceof SolrException) ? + ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code; + + log.error("Request to collection {} failed due to ("+errorCode+ + ") {}, retry? "+retryCount, collection, rootCause.toString()); + + boolean wasCommError = + (rootCause instanceof ConnectException || + rootCause instanceof ConnectTimeoutException || + rootCause instanceof NoHttpResponseException || + rootCause instanceof SocketException); + + boolean stateWasStale = false; + if (retryCount < MAX_STALE_RETRIES && + requestedCollections != null && + !requestedCollections.isEmpty() && + SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE) + { + // cached state for one or more external collections was stale + // re-issue request using updated state + stateWasStale = true; + + // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence + for (DocCollection ext : requestedCollections) { + collectionStateCache.remove(ext.getName()); + } + } + + // if we experienced a communication error, it's worth checking the state + // with ZK just to make sure the node we're trying to hit is still part of the collection + if (retryCount < MAX_STALE_RETRIES && + !stateWasStale && + requestedCollections != null && + !requestedCollections.isEmpty() && + wasCommError) { + for (DocCollection ext : requestedCollections) { + DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null); + if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) { + // looks like we couldn't reach the server because the state was stale == retry + stateWasStale = true; + // we just pulled state from ZK, so update the cache so that the retry uses it + collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk)); + } + } + } + + if (requestedCollections != null) { + requestedCollections.clear(); // done with this + } + + // if the state was stale, then we retry the request once with new state pulled from Zk + if (stateWasStale) { + log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server."); + resp = requestWithRetryOnStaleState(request, retryCount+1, collection); + } else { + if (exc instanceof SolrServerException) { + throw (SolrServerException)exc; + } else if (exc instanceof IOException) { + throw (IOException)exc; + } else { + throw new SolrServerException(rootCause); + } + } + } + + return resp; + } + + protected NamedList<Object> sendRequest(SolrRequest request) + throws SolrServerException, IOException { + connect(); + + ClusterState clusterState = zkStateReader.getClusterState(); + + boolean sendToLeaders = false; + List<String> replicas = null; + + if (request instanceof IsUpdateRequest) { + if (request instanceof UpdateRequest) { + NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, + clusterState); + if (response != null) { + return response; + } + } + sendToLeaders = true; + replicas = new ArrayList<>(); + } + + SolrParams reqParams = request.getParams(); + if (reqParams == null) { + reqParams = new ModifiableSolrParams(); + } + List<String> theUrlList = new ArrayList<>(); + if (request.getPath().equals("/admin/collections") + || request.getPath().equals("/admin/cores")) { + Set<String> liveNodes = clusterState.getLiveNodes(); + for (String liveNode : liveNodes) { + theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode)); + } + } else { + String collection = reqParams.get(UpdateParams.COLLECTION, defaultCollection); + + if (collection == null) { + throw new SolrServerException( + "No collection param specified on request and no default collection has been set."); + } + + Set<String> collectionNames = getCollectionNames(clusterState, collection); + if (collectionNames.size() == 0) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Could not find collection: " + collection); + } + + String shardKeys = reqParams.get(ShardParams._ROUTE_); + + // TODO: not a big deal because of the caching, but we could avoid looking + // at every shard + // when getting leaders if we tweaked some things + + // Retrieve slices from the cloud state and, for each collection + // specified, + // add it to the Map of slices. + Map<String,Slice> slices = new HashMap<>(); + for (String collectionName : collectionNames) { + DocCollection col = getDocCollection(clusterState, collectionName, null); + Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col); + ClientUtils.addSlices(slices, collectionName, routeSlices, true); + } + Set<String> liveNodes = clusterState.getLiveNodes(); + + List<String> leaderUrlList = null; + List<String> urlList = null; + List<String> replicasList = null; + + // build a map of unique nodes + // TODO: allow filtering by group, role, etc + Map<String,ZkNodeProps> nodes = new HashMap<>(); + List<String> urlList2 = new ArrayList<>(); + for (Slice slice : slices.values()) { + for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) { + ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps); + String node = coreNodeProps.getNodeName(); + if (!liveNodes.contains(coreNodeProps.getNodeName()) + || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue; + if (nodes.put(node, nodeProps) == null) { + if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) { + String url; + if (reqParams.get(UpdateParams.COLLECTION) == null) { + url = ZkCoreNodeProps.getCoreUrl( + nodeProps.getStr(ZkStateReader.BASE_URL_PROP), + defaultCollection); + } else { + url = coreNodeProps.getCoreUrl(); + } + urlList2.add(url); + } else if (sendToLeaders) { + String url; + if (reqParams.get(UpdateParams.COLLECTION) == null) { + url = ZkCoreNodeProps.getCoreUrl( + nodeProps.getStr(ZkStateReader.BASE_URL_PROP), + defaultCollection); + } else { + url = coreNodeProps.getCoreUrl(); + } + replicas.add(url); + } + } + } + } + + if (sendToLeaders) { + leaderUrlList = urlList2; + replicasList = replicas; + } else { + urlList = urlList2; + } + + if (sendToLeaders) { + theUrlList = new ArrayList<>(leaderUrlList.size()); + theUrlList.addAll(leaderUrlList); + } else { + theUrlList = new ArrayList<>(urlList.size()); + theUrlList.addAll(urlList); + } + if(theUrlList.isEmpty()) { + for (String s : collectionNames) { + if(s!=null) collectionStateCache.remove(s); + } + throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request"); + } + + Collections.shuffle(theUrlList, rand); + if (sendToLeaders) { + ArrayList<String> theReplicas = new ArrayList<>( + replicasList.size()); + theReplicas.addAll(replicasList); + Collections.shuffle(theReplicas, rand); + theUrlList.addAll(theReplicas); + } + + } + + LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList); + LBHttpSolrClient.Rsp rsp = lbClient.request(req); + return rsp.getResponse(); + } + + private Set<String> getCollectionNames(ClusterState clusterState, + String collection) { + // Extract each comma separated collection name and store in a List. + List<String> rawCollectionsList = StrUtils.splitSmart(collection, ",", true); + Set<String> collectionNames = new HashSet<>(); + // validate collections + for (String collectionName : rawCollectionsList) { + if (!clusterState.getCollections().contains(collectionName)) { + Aliases aliases = zkStateReader.getAliases(); + String alias = aliases.getCollectionAlias(collectionName); + if (alias != null) { + List<String> aliasList = StrUtils.splitSmart(alias, ",", true); + collectionNames.addAll(aliasList); + continue; + } + + throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName); + } + + collectionNames.add(collectionName); + } + return collectionNames; + } + + @Override + public void close() throws IOException { + shutdown(); + } + + @Override + @Deprecated + public void shutdown() { + if (zkStateReader != null) { + synchronized(this) { + if (zkStateReader!= null) + zkStateReader.close(); + zkStateReader = null; + } + } + + if (shutdownLBHttpSolrServer) { + lbClient.shutdown(); + } + + if (clientIsInternal && myClient!=null) { + HttpClientUtil.close(myClient); + } + + if(this.threadPool != null && !this.threadPool.isShutdown()) { + this.threadPool.shutdown(); + } + } + + public LBHttpSolrClient getLbClient() { + return lbClient; + } + + public boolean isUpdatesToLeaders() { + return updatesToLeaders; + } + + /**If caches are expired they are refreshed after acquiring a lock. + * use this to set the number of locks + */ + public void setParallelCacheRefreshes(int n){ locks = objectList(n); } + + private static ArrayList<Object> objectList(int n) { + ArrayList<Object> l = new ArrayList<>(n); + for(int i=0;i<n;i++) l.add(new Object()); + return l; + } + + + protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException { + if (collection == null) return null; + DocCollection col = getFromCache(collection); + if (col != null) { + if (expectedVersion == null) return col; + if (expectedVersion.intValue() == col.getZNodeVersion()) return col; + } + + ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection); + if (ref == null) { + //no such collection exists + return null; + } + if (!ref.isLazilyLoaded()) { + //it is readily available just return it + return ref.get(); + } + List locks = this.locks; + final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size())); + synchronized (lock) { + //we have waited for sometime just check once again + col = getFromCache(collection); + if (col != null) { + if (expectedVersion == null) return col; + if (expectedVersion.intValue() == col.getZNodeVersion()) { + return col; + } else { + collectionStateCache.remove(collection); + } + } + col = ref.get();//this is a call to ZK + } + if (col == null) return null; + if (col.getStateFormat() > 1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col)); + return col; + } + + private DocCollection getFromCache(String c){ + ExpiringCachedDocCollection cachedState = collectionStateCache.get(c); + return cachedState != null ? cachedState.cached : null; + } + + + /** + * Useful for determining the minimum achieved replication factor across + * all shards involved in processing an update request, typically useful + * for gauging the replication factor of a batch. + */ + @SuppressWarnings("rawtypes") + public int getMinAchievedReplicationFactor(String collection, NamedList resp) { + // it's probably already on the top-level header set by condense + NamedList header = (NamedList)resp.get("responseHeader"); + Integer achRf = (Integer)header.get(UpdateRequest.REPFACT); + if (achRf != null) + return achRf.intValue(); + + // not on the top-level header, walk the shard route tree + Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp); + for (Integer rf : shardRf.values()) { + if (achRf == null || rf < achRf) { + achRf = rf; + } + } + return (achRf != null) ? achRf.intValue() : -1; + } + + /** + * Walks the NamedList response after performing an update request looking for + * the replication factor that was achieved in each shard involved in the request. + * For single doc updates, there will be only one shard in the return value. + */ + @SuppressWarnings("rawtypes") + public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) { + connect(); + + Map<String,Integer> results = new HashMap<String,Integer>(); + if (resp instanceof CloudSolrClient.RouteResponse) { + NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses(); + ClusterState clusterState = zkStateReader.getClusterState(); + Map<String,String> leaders = new HashMap<String,String>(); + for (Slice slice : clusterState.getActiveSlices(collection)) { + Replica leader = slice.getLeader(); + if (leader != null) { + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); + String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName(); + leaders.put(leaderUrl, slice.getName()); + String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection; + leaders.put(altLeaderUrl, slice.getName()); + } + } + + Iterator<Map.Entry<String,Object>> routeIter = routes.iterator(); + while (routeIter.hasNext()) { + Map.Entry<String,Object> next = routeIter.next(); + String host = next.getKey(); + NamedList hostResp = (NamedList)next.getValue(); + Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT); + if (rf != null) { + String shard = leaders.get(host); + if (shard == null) { + if (host.endsWith("/")) + shard = leaders.get(host.substring(0,host.length()-1)); + if (shard == null) { + shard = host; + } + } + results.put(shard, rf); + } + } + } + return results; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java new file mode 100644 index 0000000..4e2a2e7 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import org.apache.http.client.HttpClient; + +import java.util.Collection; + +/** + * @deprecated Use {@link org.apache.solr.client.solrj.impl.CloudSolrClient} + */ +@Deprecated +public class CloudSolrServer extends CloudSolrClient { + + public CloudSolrServer(String zkHost) { + super(zkHost); + } + + public CloudSolrServer(String zkHost, HttpClient httpClient) { + super(zkHost, httpClient); + } + + public CloudSolrServer(Collection<String> zkHosts, String chroot) { + super(zkHosts, chroot); + } + + public CloudSolrServer(Collection<String> zkHosts, String chroot, HttpClient httpClient) { + super(zkHosts, chroot, httpClient); + } + + public CloudSolrServer(String zkHost, boolean updatesToLeaders) { + super(zkHost, updatesToLeaders); + } + + public CloudSolrServer(String zkHost, boolean updatesToLeaders, HttpClient httpClient) { + super(zkHost, updatesToLeaders, httpClient); + } + + public CloudSolrServer(String zkHost, LBHttpSolrClient lbClient) { + super(zkHost, lbClient); + } + + public CloudSolrServer(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) { + super(zkHost, lbClient, updatesToLeaders); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java new file mode 100644 index 0000000..5e65021 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentProducer; +import org.apache.http.entity.EntityTemplate; +import org.apache.solr.client.solrj.ResponseParser; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.Locale; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * ConcurrentUpdateSolrClient buffers all added documents and writes + * them into open HTTP connections. This class is thread safe. + * + * Params from {@link UpdateRequest} are converted to http request + * parameters. When params change between UpdateRequests a new HTTP + * request is started. + * + * Although any SolrClient request can be made with this implementation, it is + * only recommended to use ConcurrentUpdateSolrClient with /update + * requests. The class {@link HttpSolrClient} is better suited for the + * query interface. + */ +public class ConcurrentUpdateSolrClient extends SolrClient { + private static final long serialVersionUID = 1L; + static final Logger log = LoggerFactory + .getLogger(ConcurrentUpdateSolrClient.class); + private HttpSolrClient client; + final BlockingQueue<UpdateRequest> queue; + final ExecutorService scheduler; + final Queue<Runner> runners; + volatile CountDownLatch lock = null; // used to block everything + final int threadCount; + boolean shutdownExecutor = false; + int pollQueueTime = 250; + private final boolean streamDeletes; + + /** + * Uses an internally managed HttpClient instance. + * + * @param solrServerUrl + * The Solr server URL + * @param queueSize + * The buffer size before the documents are sent to the server + * @param threadCount + * The number of background threads used to empty the queue + */ + public ConcurrentUpdateSolrClient(String solrServerUrl, int queueSize, + int threadCount) { + this(solrServerUrl, null, queueSize, threadCount); + shutdownExecutor = true; + } + + public ConcurrentUpdateSolrClient(String solrServerUrl, + HttpClient client, int queueSize, int threadCount) { + this(solrServerUrl, client, queueSize, threadCount, Executors.newCachedThreadPool( + new SolrjNamedThreadFactory("concurrentUpdateScheduler"))); + shutdownExecutor = true; + } + + /** + * Uses the supplied HttpClient to send documents to the Solr server. + */ + public ConcurrentUpdateSolrClient(String solrServerUrl, + HttpClient client, int queueSize, int threadCount, ExecutorService es) { + this(solrServerUrl, client, queueSize, threadCount, es, false); + } + + /** + * Uses the supplied HttpClient to send documents to the Solr server. + */ + public ConcurrentUpdateSolrClient(String solrServerUrl, + HttpClient client, int queueSize, int threadCount, ExecutorService es, boolean streamDeletes) { + this.client = new HttpSolrClient(solrServerUrl, client); + this.client.setFollowRedirects(false); + queue = new LinkedBlockingQueue<>(queueSize); + this.threadCount = threadCount; + runners = new LinkedList<>(); + scheduler = es; + this.streamDeletes = streamDeletes; + } + + public Set<String> getQueryParams() { + return this.client.getQueryParams(); + } + + /** + * Expert Method. + * @param queryParams set of param keys to only send via the query string + */ + public void setQueryParams(Set<String> queryParams) { + this.client.setQueryParams(queryParams); + } + + /** + * Opens a connection and sends everything... + */ + class Runner implements Runnable { + final Lock runnerLock = new ReentrantLock(); + + @Override + public void run() { + runnerLock.lock(); + + log.debug("starting runner: {}", this); + HttpPost method = null; + HttpResponse response = null; + try { + while (!queue.isEmpty()) { + try { + final UpdateRequest updateRequest = + queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + if (updateRequest == null) + break; + + String contentType = client.requestWriter.getUpdateContentType(); + final boolean isXml = ClientUtils.TEXT_XML.equals(contentType); + + final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams()); + + EntityTemplate template = new EntityTemplate(new ContentProducer() { + + @Override + public void writeTo(OutputStream out) throws IOException { + try { + if (isXml) { + out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything + } + UpdateRequest req = updateRequest; + while (req != null) { + SolrParams currentParams = new ModifiableSolrParams(req.getParams()); + if (!origParams.toNamedList().equals(currentParams.toNamedList())) { + queue.add(req); // params are different, push back to queue + break; + } + + client.requestWriter.write(req, out); + if (isXml) { + // check for commit or optimize + SolrParams params = req.getParams(); + if (params != null) { + String fmt = null; + if (params.getBool(UpdateParams.OPTIMIZE, false)) { + fmt = "<optimize waitSearcher=\"%s\" />"; + } else if (params.getBool(UpdateParams.COMMIT, false)) { + fmt = "<commit waitSearcher=\"%s\" />"; + } + if (fmt != null) { + byte[] content = String.format(Locale.ROOT, + fmt, + params.getBool(UpdateParams.WAIT_SEARCHER, false) + + "").getBytes(StandardCharsets.UTF_8); + out.write(content); + } + } + } + out.flush(); + req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + } + + if (isXml) { + out.write("</stream>".getBytes(StandardCharsets.UTF_8)); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("", e); + } + } + }); + + // The parser 'wt=' and 'version=' params are used instead of the + // original params + ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams); + requestParams.set(CommonParams.WT, client.parser.getWriterType()); + requestParams.set(CommonParams.VERSION, client.parser.getVersion()); + + method = new HttpPost(client.getBaseURL() + "/update" + + ClientUtils.toQueryString(requestParams, false)); + method.setEntity(template); + method.addHeader("User-Agent", HttpSolrClient.AGENT); + method.addHeader("Content-Type", contentType); + + response = client.getHttpClient().execute(method); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.SC_OK) { + StringBuilder msg = new StringBuilder(); + msg.append(response.getStatusLine().getReasonPhrase()); + msg.append("\n\n\n\n"); + msg.append("request: ").append(method.getURI()); + + SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()); + // parse out the metadata from the SolrException + try { + NamedList<Object> resp = + client.parser.processResponse(response.getEntity().getContent(), + response.getEntity().getContentType().getValue()); + NamedList<Object> error = (NamedList<Object>) resp.get("error"); + if (error != null) + solrExc.setMetadata((NamedList<String>) error.get("metadata")); + } catch (Exception exc) { + // don't want to fail to report error if parsing the response fails + log.warn("Failed to parse error response from "+ client.getBaseURL()+" due to: "+exc); + } + + handleError(solrExc); + } else { + onSuccess(response); + } + } finally { + try { + if (response != null) { + response.getEntity().getContent().close(); + } + } catch (Exception ex) { + log.warn("", ex); + } + } + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + throw (OutOfMemoryError) e; + } + handleError(e); + } finally { + synchronized (runners) { + if (runners.size() == 1 && !queue.isEmpty()) { + // keep this runner alive + scheduler.execute(this); + } else { + runners.remove(this); + if (runners.isEmpty()) + runners.notifyAll(); + } + } + + log.debug("finished: {}", this); + runnerLock.unlock(); + } + } + } + + @Override + public NamedList<Object> request(final SolrRequest request) + throws SolrServerException, IOException { + if (!(request instanceof UpdateRequest)) { + return client.request(request); + } + UpdateRequest req = (UpdateRequest) request; + + // this happens for commit... + if (streamDeletes) { + if ((req.getDocuments() == null || req.getDocuments().isEmpty()) + && (req.getDeleteById() == null || req.getDeleteById().isEmpty()) + && (req.getDeleteByIdMap() == null || req.getDeleteByIdMap().isEmpty())) { + if (req.getDeleteQuery() == null) { + blockUntilFinished(); + return client.request(request); + } + } + } else { + if ((req.getDocuments() == null || req.getDocuments().isEmpty())) { + blockUntilFinished(); + return client.request(request); + } + } + + + SolrParams params = req.getParams(); + if (params != null) { + // check if it is waiting for the searcher + if (params.getBool(UpdateParams.WAIT_SEARCHER, false)) { + log.info("blocking for commit/optimize"); + blockUntilFinished(); // empty the queue + return client.request(request); + } + } + + try { + CountDownLatch tmpLock = lock; + if (tmpLock != null) { + tmpLock.await(); + } + + boolean success = queue.offer(req); + + for (;;) { + synchronized (runners) { + // see if queue is half full and we can add more runners + // special case: if only using a threadCount of 1 and the queue + // is filling up, allow 1 add'l runner to help process the queue + if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount)) + { + // We need more runners, so start a new one. + Runner r = new Runner(); + runners.add(r); + scheduler.execute(r); + } else { + // break out of the retry loop if we added the element to the queue + // successfully, *and* + // while we are still holding the runners lock to prevent race + // conditions. + if (success) + break; + } + } + + // Retry to add to the queue w/o the runners lock held (else we risk + // temporary deadlock) + // This retry could also fail because + // 1) existing runners were not able to take off any new elements in the + // queue + // 2) the queue was filled back up since our last try + // If we succeed, the queue may have been completely emptied, and all + // runners stopped. + // In all cases, we should loop back to the top to see if we need to + // start more runners. + // + if (!success) { + success = queue.offer(req, 100, TimeUnit.MILLISECONDS); + } + } + } catch (InterruptedException e) { + log.error("interrupted", e); + throw new IOException(e.getLocalizedMessage()); + } + + // RETURN A DUMMY result + NamedList<Object> dummy = new NamedList<>(); + dummy.add("NOTE", "the request is processed in a background stream"); + return dummy; + } + + public synchronized void blockUntilFinished() { + lock = new CountDownLatch(1); + try { + synchronized (runners) { + while (!runners.isEmpty()) { + try { + runners.wait(); + } catch (InterruptedException e) { + Thread.interrupted(); + } + + if (scheduler.isTerminated()) + break; + + // if we reach here, then we probably got the notifyAll, but need to check if + // the queue is empty before really considering this is finished (SOLR-4260) + int queueSize = queue.size(); + if (queueSize > 0) { + log.warn("No more runners, but queue still has "+ + queueSize+" adding more runners to process remaining requests on queue"); + Runner r = new Runner(); + runners.add(r); + scheduler.execute(r); + } + } + } + } finally { + lock.countDown(); + lock = null; + } + } + + public void handleError(Throwable ex) { + log.error("error", ex); + } + + /** + * Intended to be used as an extension point for doing post processing after a request completes. + */ + public void onSuccess(HttpResponse resp) { + // no-op by design, override to add functionality + } + + @Override + public void close() { + shutdown(); + } + + @Override + @Deprecated + public void shutdown() { + client.shutdown(); + if (shutdownExecutor) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log + .error("ExecutorService did not terminate"); + } + } catch (InterruptedException ie) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + public void setConnectionTimeout(int timeout) { + HttpClientUtil.setConnectionTimeout(client.getHttpClient(), timeout); + } + + /** + * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably + * not for indexing. + */ + public void setSoTimeout(int timeout) { + HttpClientUtil.setSoTimeout(client.getHttpClient(), timeout); + } + + public void shutdownNow() { + client.shutdown(); + if (shutdownExecutor) { + scheduler.shutdownNow(); // Cancel currently executing tasks + try { + if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) + log.error("ExecutorService did not terminate"); + } catch (InterruptedException ie) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + public void setParser(ResponseParser responseParser) { + client.setParser(responseParser); + } + + + /** + * @param pollQueueTime time for an open connection to wait for updates when + * the queue is empty. + */ + public void setPollQueueTime(int pollQueueTime) { + this.pollQueueTime = pollQueueTime; + } + + public void setRequestWriter(RequestWriter requestWriter) { + client.setRequestWriter(requestWriter); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java new file mode 100644 index 0000000..9ace82a --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import org.apache.http.client.HttpClient; + +import java.util.concurrent.ExecutorService; + +/** + * @deprecated Use {@link org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient} + */ +@Deprecated +public class ConcurrentUpdateSolrServer extends ConcurrentUpdateSolrClient { + + public ConcurrentUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount) { + super(solrServerUrl, queueSize, threadCount); + } + + public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount) { + super(solrServerUrl, client, queueSize, threadCount); + } + + public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount, ExecutorService es) { + super(solrServerUrl, client, queueSize, threadCount, es); + } + + public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount, ExecutorService es, boolean streamDeletes) { + super(solrServerUrl, client, queueSize, threadCount, es, streamDeletes); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java new file mode 100644 index 0000000..1d370ff --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java @@ -0,0 +1,97 @@ +package org.apache.solr.client.solrj.impl; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.solr.common.params.SolrParams; + +/** + * The default http client configurer. If the behaviour needs to be customized a + * new HttpCilentConfigurer can be set by calling + * {@link HttpClientUtil#setConfigurer(HttpClientConfigurer)} + */ +public class HttpClientConfigurer { + + public void configure(DefaultHttpClient httpClient, SolrParams config) { + + if (config.get(HttpClientUtil.PROP_MAX_CONNECTIONS) != null) { + HttpClientUtil.setMaxConnections(httpClient, + config.getInt(HttpClientUtil.PROP_MAX_CONNECTIONS)); + } + + if (config.get(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST) != null) { + HttpClientUtil.setMaxConnectionsPerHost(httpClient, + config.getInt(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST)); + } + + if (config.get(HttpClientUtil.PROP_CONNECTION_TIMEOUT) != null) { + HttpClientUtil.setConnectionTimeout(httpClient, + config.getInt(HttpClientUtil.PROP_CONNECTION_TIMEOUT)); + } + + if (config.get(HttpClientUtil.PROP_SO_TIMEOUT) != null) { + HttpClientUtil.setSoTimeout(httpClient, + config.getInt(HttpClientUtil.PROP_SO_TIMEOUT)); + } + + if (config.get(HttpClientUtil.PROP_FOLLOW_REDIRECTS) != null) { + HttpClientUtil.setFollowRedirects(httpClient, + config.getBool(HttpClientUtil.PROP_FOLLOW_REDIRECTS)); + } + + // always call setUseRetry, whether it is in config or not + HttpClientUtil.setUseRetry(httpClient, + config.getBool(HttpClientUtil.PROP_USE_RETRY, true)); + + final String basicAuthUser = config + .get(HttpClientUtil.PROP_BASIC_AUTH_USER); + final String basicAuthPass = config + .get(HttpClientUtil.PROP_BASIC_AUTH_PASS); + HttpClientUtil.setBasicAuth(httpClient, basicAuthUser, basicAuthPass); + + if (config.get(HttpClientUtil.PROP_ALLOW_COMPRESSION) != null) { + HttpClientUtil.setAllowCompression(httpClient, + config.getBool(HttpClientUtil.PROP_ALLOW_COMPRESSION)); + } + + boolean sslCheckPeerName = toBooleanDefaultIfNull( + toBooleanObject(System.getProperty(HttpClientUtil.SYS_PROP_CHECK_PEER_NAME)), true); + if(sslCheckPeerName == false) { + HttpClientUtil.setHostNameVerifier(httpClient, SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + } + } + + public static boolean toBooleanDefaultIfNull(Boolean bool, boolean valueIfNull) { + if (bool == null) { + return valueIfNull; + } + return bool.booleanValue() ? true : false; + } + + public static Boolean toBooleanObject(String str) { + if ("true".equalsIgnoreCase(str)) { + return Boolean.TRUE; + } else if ("false".equalsIgnoreCase(str)) { + return Boolean.FALSE; + } + // no match + return null; + } +}
