http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java new file mode 100644 index 0000000..96ed211 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.InflaterInputStream; + +import org.apache.http.Header; +import org.apache.http.HeaderElement; +import org.apache.http.HttpEntity; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseInterceptor; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.HttpClient; +import org.apache.http.client.params.ClientParamBean; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.conn.ssl.X509HostnameVerifier; +import org.apache.http.entity.HttpEntityWrapper; +//import org.apache.http.impl.client.CloseableHttpClient; //RANGER_UPDATE - to use SystemDefaultHttpClient +import org.apache.http.impl.client.AbstractHttpClient; //RANGER_UPDATE +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; +import org.apache.http.impl.client.SystemDefaultHttpClient; +import org.apache.http.impl.conn.PoolingClientConnectionManager; +import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; // jdoc +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.protocol.HttpContext; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for creating/configuring httpclient instances. + */ +public class HttpClientUtil { + // socket timeout measured in ms, closes a socket if read + // takes longer than x ms to complete. throws + // java.net.SocketTimeoutException: Read timed out exception + public static final String PROP_SO_TIMEOUT = "socketTimeout"; + // connection timeout measures in ms, closes a socket if connection + // cannot be established within x ms. with a + // java.net.SocketTimeoutException: Connection timed out + public static final String PROP_CONNECTION_TIMEOUT = "connTimeout"; + // Maximum connections allowed per host + public static final String PROP_MAX_CONNECTIONS_PER_HOST = "maxConnectionsPerHost"; + // Maximum total connections allowed + public static final String PROP_MAX_CONNECTIONS = "maxConnections"; + // Retry http requests on error + public static final String PROP_USE_RETRY = "retry"; + // Allow compression (deflate,gzip) if server supports it + public static final String PROP_ALLOW_COMPRESSION = "allowCompression"; + // Follow redirects + public static final String PROP_FOLLOW_REDIRECTS = "followRedirects"; + // Basic auth username + public static final String PROP_BASIC_AUTH_USER = "httpBasicAuthUser"; + // Basic auth password + public static final String PROP_BASIC_AUTH_PASS = "httpBasicAuthPassword"; + + public static final String SYS_PROP_CHECK_PEER_NAME = "solr.ssl.checkPeerName"; + + private static final Logger logger = LoggerFactory + .getLogger(HttpClientUtil.class); + + static final DefaultHttpRequestRetryHandler NO_RETRY = new DefaultHttpRequestRetryHandler( + 0, false); + + private static HttpClientConfigurer configurer = new HttpClientConfigurer(); + + private HttpClientUtil(){} + + /** + * Replace the {@link HttpClientConfigurer} class used in configuring the http + * clients with a custom implementation. + */ + public static void setConfigurer(HttpClientConfigurer newConfigurer) { + configurer = newConfigurer; + } + + public static HttpClientConfigurer getConfigurer() { + return configurer; + } + + /** + * Creates new http client by using the provided configuration. + * + * @param params + * http client configuration, if null a client with default + * configuration (no additional configuration) is created. + */ + //public static CloseableHttpClient createClient(final SolrParams params) { //RANGER_UPDATE + public static AbstractHttpClient createClient(final SolrParams params) { + final ModifiableSolrParams config = new ModifiableSolrParams(params); + if (logger.isDebugEnabled()) { + logger.debug("Creating new http client, config:" + config); + } + final DefaultHttpClient httpClient = new SystemDefaultHttpClient(); + configureClient(httpClient, config); + return httpClient; + } + + /** + * Creates new http client by using the provided configuration. + * + */ + //public static CloseableHttpClient createClient(final SolrParams params, ClientConnectionManager cm) { + public static AbstractHttpClient createClient(final SolrParams params, ClientConnectionManager cm) { + final ModifiableSolrParams config = new ModifiableSolrParams(params); + if (logger.isDebugEnabled()) { + logger.debug("Creating new http client, config:" + config); + } + final DefaultHttpClient httpClient = new DefaultHttpClient(cm); + configureClient(httpClient, config); + return httpClient; + } + + /** + * Configures {@link DefaultHttpClient}, only sets parameters if they are + * present in config. + */ + public static void configureClient(final DefaultHttpClient httpClient, + SolrParams config) { + configurer.configure(httpClient, config); + } + + public static void close(HttpClient httpClient) { +// if (httpClient instanceof CloseableHttpClient) { //RANGER_UPDATE +// org.apache.solr.common.util.IOUtils.closeQuietly((CloseableHttpClient) httpClient); //RANGER_UPDATE +// } else { //RANGER_UPDATE + httpClient.getConnectionManager().shutdown(); +// } //RANGER_UPDATE + } + + /** + * Control HTTP payload compression. + * + * @param allowCompression + * true will enable compression (needs support from server), false + * will disable compression. + */ + public static void setAllowCompression(DefaultHttpClient httpClient, + boolean allowCompression) { + httpClient + .removeRequestInterceptorByClass(UseCompressionRequestInterceptor.class); + httpClient + .removeResponseInterceptorByClass(UseCompressionResponseInterceptor.class); + if (allowCompression) { + httpClient.addRequestInterceptor(new UseCompressionRequestInterceptor()); + httpClient + .addResponseInterceptor(new UseCompressionResponseInterceptor()); + } + } + + /** + * Set http basic auth information. If basicAuthUser or basicAuthPass is null + * the basic auth configuration is cleared. Currently this is not preemtive + * authentication. So it is not currently possible to do a post request while + * using this setting. + */ + public static void setBasicAuth(DefaultHttpClient httpClient, + String basicAuthUser, String basicAuthPass) { + if (basicAuthUser != null && basicAuthPass != null) { + httpClient.getCredentialsProvider().setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(basicAuthUser, basicAuthPass)); + } else { + httpClient.getCredentialsProvider().clear(); + } + } + + /** + * Set max connections allowed per host. This call will only work when + * {@link ThreadSafeClientConnManager} or + * {@link PoolingClientConnectionManager} is used. + */ + public static void setMaxConnectionsPerHost(HttpClient httpClient, + int max) { + // would have been nice if there was a common interface + if (httpClient.getConnectionManager() instanceof ThreadSafeClientConnManager) { + ThreadSafeClientConnManager mgr = (ThreadSafeClientConnManager)httpClient.getConnectionManager(); + mgr.setDefaultMaxPerRoute(max); + } else if (httpClient.getConnectionManager() instanceof PoolingClientConnectionManager) { + PoolingClientConnectionManager mgr = (PoolingClientConnectionManager)httpClient.getConnectionManager(); + mgr.setDefaultMaxPerRoute(max); + } + } + + /** + * Set max total connections allowed. This call will only work when + * {@link ThreadSafeClientConnManager} or + * {@link PoolingClientConnectionManager} is used. + */ + public static void setMaxConnections(final HttpClient httpClient, + int max) { + // would have been nice if there was a common interface + if (httpClient.getConnectionManager() instanceof ThreadSafeClientConnManager) { + ThreadSafeClientConnManager mgr = (ThreadSafeClientConnManager)httpClient.getConnectionManager(); + mgr.setMaxTotal(max); + } else if (httpClient.getConnectionManager() instanceof PoolingClientConnectionManager) { + PoolingClientConnectionManager mgr = (PoolingClientConnectionManager)httpClient.getConnectionManager(); + mgr.setMaxTotal(max); + } + } + + + /** + * Defines the socket timeout (SO_TIMEOUT) in milliseconds. A timeout value of + * zero is interpreted as an infinite timeout. + * + * @param timeout timeout in milliseconds + */ + public static void setSoTimeout(HttpClient httpClient, int timeout) { + HttpConnectionParams.setSoTimeout(httpClient.getParams(), + timeout); + } + + /** + * Control retry handler + * @param useRetry when false the client will not try to retry failed requests. + */ + public static void setUseRetry(final DefaultHttpClient httpClient, + boolean useRetry) { + if (!useRetry) { + httpClient.setHttpRequestRetryHandler(NO_RETRY); + } else { + // if the request is not fully sent, we retry + // streaming updates are not a problem, because they are not retryable + httpClient.setHttpRequestRetryHandler(new DefaultHttpRequestRetryHandler(){ + @Override + protected boolean handleAsIdempotent(final HttpRequest request) { + return false; // we can't tell if a Solr request is idempotent + } + }); + } + } + + /** + * Set connection timeout. A timeout value of zero is interpreted as an + * infinite timeout. + * + * @param timeout + * connection Timeout in milliseconds + */ + public static void setConnectionTimeout(final HttpClient httpClient, + int timeout) { + HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), + timeout); + } + + /** + * Set follow redirects. + * + * @param followRedirects When true the client will follow redirects. + */ + public static void setFollowRedirects(HttpClient httpClient, + boolean followRedirects) { + new ClientParamBean(httpClient.getParams()).setHandleRedirects(followRedirects); + } + + public static void setHostNameVerifier(DefaultHttpClient httpClient, + X509HostnameVerifier hostNameVerifier) { + Scheme httpsScheme = httpClient.getConnectionManager().getSchemeRegistry().get("https"); + if (httpsScheme != null) { + SSLSocketFactory sslSocketFactory = (SSLSocketFactory) httpsScheme.getSchemeSocketFactory(); + sslSocketFactory.setHostnameVerifier(hostNameVerifier); + } + } + + public static void setStaleCheckingEnabled(final HttpClient httpClient, boolean enabled) { + HttpConnectionParams.setStaleCheckingEnabled(httpClient.getParams(), enabled); + } + + public static void setTcpNoDelay(final HttpClient httpClient, boolean tcpNoDelay) { + HttpConnectionParams.setTcpNoDelay(httpClient.getParams(), tcpNoDelay); + } + + private static class UseCompressionRequestInterceptor implements + HttpRequestInterceptor { + + @Override + public void process(HttpRequest request, HttpContext context) + throws HttpException, IOException { + if (!request.containsHeader("Accept-Encoding")) { + request.addHeader("Accept-Encoding", "gzip, deflate"); + } + } + } + + private static class UseCompressionResponseInterceptor implements + HttpResponseInterceptor { + + @Override + public void process(final HttpResponse response, final HttpContext context) + throws HttpException, IOException { + + HttpEntity entity = response.getEntity(); + Header ceheader = entity.getContentEncoding(); + if (ceheader != null) { + HeaderElement[] codecs = ceheader.getElements(); + for (int i = 0; i < codecs.length; i++) { + if (codecs[i].getName().equalsIgnoreCase("gzip")) { + response + .setEntity(new GzipDecompressingEntity(response.getEntity())); + return; + } + if (codecs[i].getName().equalsIgnoreCase("deflate")) { + response.setEntity(new DeflateDecompressingEntity(response + .getEntity())); + return; + } + } + } + } + } + + private static class GzipDecompressingEntity extends HttpEntityWrapper { + public GzipDecompressingEntity(final HttpEntity entity) { + super(entity); + } + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + return new GZIPInputStream(wrappedEntity.getContent()); + } + + @Override + public long getContentLength() { + return -1; + } + } + + private static class DeflateDecompressingEntity extends + GzipDecompressingEntity { + public DeflateDecompressingEntity(final HttpEntity entity) { + super(entity); + } + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + return new InflaterInputStream(wrappedEntity.getContent()); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java new file mode 100644 index 0000000..a073265 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java @@ -0,0 +1,821 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.impl; + +import org.apache.commons.io.IOUtils; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.NameValuePair; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.entity.mime.FormBodyPart; +import org.apache.http.entity.mime.HttpMultipartMode; +import org.apache.http.entity.mime.MultipartEntity; +import org.apache.http.entity.mime.content.InputStreamBody; +import org.apache.http.entity.mime.content.StringBody; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.message.BasicHeader; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.apache.solr.client.solrj.ResponseParser; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class HttpSolrClient extends SolrClient { + + private static final String UTF_8 = StandardCharsets.UTF_8.name(); + private static final String DEFAULT_PATH = "/select"; + private static final long serialVersionUID = -946812319974801896L; + + /** + * User-Agent String. + */ + public static final String AGENT = "Solr[" + HttpSolrClient.class.getName() + "] 1.0"; + + private static Logger log = LoggerFactory.getLogger(HttpSolrClient.class); + + /** + * The URL of the Solr server. + */ + protected volatile String baseUrl; + + /** + * Default value: null / empty. + * <p> + * Parameters that are added to every request regardless. This may be a place + * to add something like an authentication token. + */ + protected ModifiableSolrParams invariantParams; + + /** + * Default response parser is BinaryResponseParser + * <p> + * This parser represents the default Response Parser chosen to parse the + * response if the parser were not specified as part of the request. + * + * @see org.apache.solr.client.solrj.impl.BinaryResponseParser + */ + protected volatile ResponseParser parser; + + /** + * The RequestWriter used to write all requests to Solr + * + * @see org.apache.solr.client.solrj.request.RequestWriter + */ + protected volatile RequestWriter requestWriter = new RequestWriter(); + + private final HttpClient httpClient; + + private volatile boolean followRedirects = false; + + private volatile int maxRetries = 0; + + private volatile boolean useMultiPartPost; + private final boolean internalClient; + + private volatile Set<String> queryParams = Collections.emptySet(); + + /** + * @param baseURL + * The URL of the Solr server. For example, " + * <code>http://localhost:8983/solr/</code>" if you are using the + * standard distribution Solr webapp on your local machine. + */ + public HttpSolrClient(String baseURL) { + this(baseURL, null, new BinaryResponseParser()); + } + + public HttpSolrClient(String baseURL, HttpClient client) { + this(baseURL, client, new BinaryResponseParser()); + } + + public HttpSolrClient(String baseURL, HttpClient client, ResponseParser parser) { + this.baseUrl = baseURL; + if (baseUrl.endsWith("/")) { + baseUrl = baseUrl.substring(0, baseUrl.length() - 1); + } + if (baseUrl.indexOf('?') >= 0) { + throw new RuntimeException( + "Invalid base url for solrj. The base URL must not contain parameters: " + + baseUrl); + } + + if (client != null) { + httpClient = client; + internalClient = false; + } else { + internalClient = true; + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); + params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); + httpClient = HttpClientUtil.createClient(params); + } + + this.parser = parser; + } + + public Set<String> getQueryParams() { + return queryParams; + } + + /** + * Expert Method + * @param queryParams set of param keys to only send via the query string + * Note that the param will be sent as a query string if the key is part + * of this Set or the SolrRequest's query params. + * @see org.apache.solr.client.solrj.SolrRequest#getQueryParams + */ + public void setQueryParams(Set<String> queryParams) { + this.queryParams = queryParams; + } + + /** + * Process the request. If + * {@link org.apache.solr.client.solrj.SolrRequest#getResponseParser()} is + * null, then use {@link #getParser()} + * + * @param request + * The {@link org.apache.solr.client.solrj.SolrRequest} to process + * @return The {@link org.apache.solr.common.util.NamedList} result + * @throws IOException If there is a low-level I/O error. + * + * @see #request(org.apache.solr.client.solrj.SolrRequest, + * org.apache.solr.client.solrj.ResponseParser) + */ + @Override + public NamedList<Object> request(final SolrRequest request) + throws SolrServerException, IOException { + ResponseParser responseParser = request.getResponseParser(); + if (responseParser == null) { + responseParser = parser; + } + return request(request, responseParser); + } + + public NamedList<Object> request(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException { + return executeMethod(createMethod(request),processor); + } + + /** + * @lucene.experimental + */ + public static class HttpUriRequestResponse { + public HttpUriRequest httpUriRequest; + public Future<NamedList<Object>> future; + } + + /** + * @lucene.experimental + */ + public HttpUriRequestResponse httpUriRequest(final SolrRequest request) + throws SolrServerException, IOException { + ResponseParser responseParser = request.getResponseParser(); + if (responseParser == null) { + responseParser = parser; + } + return httpUriRequest(request, responseParser); + } + + /** + * @lucene.experimental + */ + public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException { + HttpUriRequestResponse mrr = new HttpUriRequestResponse(); + final HttpRequestBase method = createMethod(request); + ExecutorService pool = Executors.newFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest")); + try { + mrr.future = pool.submit(new Callable<NamedList<Object>>(){ + + @Override + public NamedList<Object> call() throws Exception { + return executeMethod(method, processor); + }}); + + } finally { + pool.shutdown(); + } + assert method != null; + mrr.httpUriRequest = method; + return mrr; + } + + protected ModifiableSolrParams calculateQueryParams(Set<String> queryParamNames, + ModifiableSolrParams wparams) { + ModifiableSolrParams queryModParams = new ModifiableSolrParams(); + if (queryParamNames != null) { + for (String param : queryParamNames) { + String[] value = wparams.getParams(param) ; + if (value != null) { + for (String v : value) { + queryModParams.add(param, v); + } + wparams.remove(param); + } + } + } + return queryModParams; + } + + protected HttpRequestBase createMethod(final SolrRequest request) throws IOException, SolrServerException { + HttpRequestBase method = null; + InputStream is = null; + SolrParams params = request.getParams(); + Collection<ContentStream> streams = requestWriter.getContentStreams(request); + String path = requestWriter.getPath(request); + if (path == null || !path.startsWith("/")) { + path = DEFAULT_PATH; + } + + ResponseParser parser = request.getResponseParser(); + if (parser == null) { + parser = this.parser; + } + + // The parser 'wt=' and 'version=' params are used instead of the original + // params + ModifiableSolrParams wparams = new ModifiableSolrParams(params); + if (parser != null) { + wparams.set(CommonParams.WT, parser.getWriterType()); + wparams.set(CommonParams.VERSION, parser.getVersion()); + } + if (invariantParams != null) { + wparams.add(invariantParams); + } + + int tries = maxRetries + 1; + try { + while( tries-- > 0 ) { + // Note: since we aren't do intermittent time keeping + // ourselves, the potential non-timeout latency could be as + // much as tries-times (plus scheduling effects) the given + // timeAllowed. + try { + if( SolrRequest.METHOD.GET == request.getMethod() ) { + if( streams != null ) { + throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" ); + } + method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( wparams, false ) ); + } + else if( SolrRequest.METHOD.POST == request.getMethod() || SolrRequest.METHOD.PUT == request.getMethod() ) { + + String url = baseUrl + path; + boolean hasNullStreamName = false; + if (streams != null) { + for (ContentStream cs : streams) { + if (cs.getName() == null) { + hasNullStreamName = true; + break; + } + } + } + boolean isMultipart = ((this.useMultiPartPost && SolrRequest.METHOD.POST == request.getMethod()) + || ( streams != null && streams.size() > 1 )) && !hasNullStreamName; + + LinkedList<NameValuePair> postOrPutParams = new LinkedList<>(); + if (streams == null || isMultipart) { + // send server list and request list as query string params + ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams); + queryParams.add(calculateQueryParams(request.getQueryParams(), wparams)); + String fullQueryUrl = url + ClientUtils.toQueryString( queryParams, false ); + HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ? + new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl); + if (!isMultipart) { + postOrPut.addHeader("Content-Type", + "application/x-www-form-urlencoded; charset=UTF-8"); + } + + List<FormBodyPart> parts = new LinkedList<>(); + Iterator<String> iter = wparams.getParameterNamesIterator(); + while (iter.hasNext()) { + String p = iter.next(); + String[] vals = wparams.getParams(p); + if (vals != null) { + for (String v : vals) { + if (isMultipart) { + parts.add(new FormBodyPart(p, new StringBody(v, StandardCharsets.UTF_8))); + } else { + postOrPutParams.add(new BasicNameValuePair(p, v)); + } + } + } + } + + if (isMultipart && streams != null) { + for (ContentStream content : streams) { + String contentType = content.getContentType(); + if(contentType==null) { + contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default + } + String name = content.getName(); + if(name==null) { + name = ""; + } + parts.add(new FormBodyPart(name, + new InputStreamBody( + content.getStream(), + contentType, + content.getName()))); + } + } + + if (parts.size() > 0) { + MultipartEntity entity = new MultipartEntity(HttpMultipartMode.STRICT); + for(FormBodyPart p: parts) { + entity.addPart(p); + } + postOrPut.setEntity(entity); + } else { + //not using multipart + postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8)); + } + + method = postOrPut; + } + // It is has one stream, it is the post body, put the params in the URL + else { + String pstr = ClientUtils.toQueryString(wparams, false); + HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ? + new HttpPost(url + pstr) : new HttpPut(url + pstr); + + // Single stream as body + // Using a loop just to get the first one + final ContentStream[] contentStream = new ContentStream[1]; + for (ContentStream content : streams) { + contentStream[0] = content; + break; + } + if (contentStream[0] instanceof RequestWriter.LazyContentStream) { + postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), -1) { + @Override + public Header getContentType() { + return new BasicHeader("Content-Type", contentStream[0].getContentType()); + } + + @Override + public boolean isRepeatable() { + return false; + } + + }); + } else { + postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), -1) { + @Override + public Header getContentType() { + return new BasicHeader("Content-Type", contentStream[0].getContentType()); + } + + @Override + public boolean isRepeatable() { + return false; + } + }); + } + method = postOrPut; + } + } + else { + throw new SolrServerException("Unsupported method: "+request.getMethod() ); + } + } + catch( NoHttpResponseException r ) { + method = null; + if(is != null) { + is.close(); + } + // If out of tries then just rethrow (as normal error). + if (tries < 1) { + throw r; + } + } + } + } catch (IOException ex) { + throw new SolrServerException("error reading streams", ex); + } + + return method; + } + + protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser processor) throws SolrServerException { + method.addHeader("User-Agent", AGENT); + + InputStream respBody = null; + boolean shouldClose = true; + boolean success = false; + try { + // Execute the method. + final HttpResponse response = httpClient.execute(method); + int httpStatus = response.getStatusLine().getStatusCode(); + + // Read the contents + respBody = response.getEntity().getContent(); + Header ctHeader = response.getLastHeader("content-type"); + String contentType; + if (ctHeader != null) { + contentType = ctHeader.getValue(); + } else { + contentType = ""; + } + + // handle some http level checks before trying to parse the response + switch (httpStatus) { + case HttpStatus.SC_OK: + case HttpStatus.SC_BAD_REQUEST: + case HttpStatus.SC_CONFLICT: // 409 + break; + case HttpStatus.SC_MOVED_PERMANENTLY: + case HttpStatus.SC_MOVED_TEMPORARILY: + if (!followRedirects) { + throw new SolrServerException("Server at " + getBaseURL() + + " sent back a redirect (" + httpStatus + ")."); + } + break; + default: + if (processor == null) { + throw new RemoteSolrException(baseUrl, httpStatus, "non ok status: " + httpStatus + + ", message:" + response.getStatusLine().getReasonPhrase(), + null); + } + } + if (processor == null) { + + // no processor specified, return raw stream + NamedList<Object> rsp = new NamedList<>(); + rsp.add("stream", respBody); + // Only case where stream should not be closed + shouldClose = false; + success = true; + return rsp; + } + + String procCt = processor.getContentType(); + if (procCt != null) { + String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT); + String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT); + if (!procMimeType.equals(mimeType)) { + // unexpected mime type + String msg = "Expected mime type " + procMimeType + " but got " + mimeType + "."; + Header encodingHeader = response.getEntity().getContentEncoding(); + String encoding; + if (encodingHeader != null) { + encoding = encodingHeader.getValue(); + } else { + encoding = "UTF-8"; // try UTF-8 + } + try { + msg = msg + " " + IOUtils.toString(respBody, encoding); + } catch (IOException e) { + throw new RemoteSolrException(baseUrl, httpStatus, "Could not parse response with encoding " + encoding, e); + } + throw new RemoteSolrException(baseUrl, httpStatus, msg, null); + } + } + + NamedList<Object> rsp = null; + String charset = EntityUtils.getContentCharSet(response.getEntity()); + try { + rsp = processor.processResponse(respBody, charset); + } catch (Exception e) { + throw new RemoteSolrException(baseUrl, httpStatus, e.getMessage(), e); + } + if (httpStatus != HttpStatus.SC_OK) { + NamedList<String> metadata = null; + String reason = null; + try { + NamedList err = (NamedList) rsp.get("error"); + if (err != null) { + reason = (String) err.get("msg"); + if(reason == null) { + reason = (String) err.get("trace"); + } + metadata = (NamedList<String>)err.get("metadata"); + } + } catch (Exception ex) {} + if (reason == null) { + StringBuilder msg = new StringBuilder(); + msg.append(response.getStatusLine().getReasonPhrase()); + msg.append("\n\n"); + msg.append("request: " + method.getURI()); + reason = java.net.URLDecoder.decode(msg.toString(), UTF_8); + } + RemoteSolrException rss = new RemoteSolrException(baseUrl, httpStatus, reason, null); + if (metadata != null) rss.setMetadata(metadata); + throw rss; + } + success = true; + return rsp; + } catch (ConnectException e) { + throw new SolrServerException("Server refused connection at: " + + getBaseURL(), e); + } catch (SocketTimeoutException e) { + throw new SolrServerException( + "Timeout occured while waiting response from server at: " + + getBaseURL(), e); + } catch (IOException e) { + throw new SolrServerException( + "IOException occured when talking to server at: " + getBaseURL(), e); + } finally { + if (respBody != null && shouldClose) { + try { + respBody.close(); + } catch (IOException e) { + log.error("", e); + } finally { + if (!success) { + method.abort(); + } + } + } + } + } + + // ------------------------------------------------------------------- + // ------------------------------------------------------------------- + + /** + * Retrieve the default list of parameters are added to every request + * regardless. + * + * @see #invariantParams + */ + public ModifiableSolrParams getInvariantParams() { + return invariantParams; + } + + public String getBaseURL() { + return baseUrl; + } + + public void setBaseURL(String baseURL) { + this.baseUrl = baseURL; + } + + public ResponseParser getParser() { + return parser; + } + + /** + * Note: This setter method is <b>not thread-safe</b>. + * + * @param processor + * Default Response Parser chosen to parse the response if the parser + * were not specified as part of the request. + * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser() + */ + public void setParser(ResponseParser processor) { + parser = processor; + } + + /** + * Return the HttpClient this instance uses. + */ + public HttpClient getHttpClient() { + return httpClient; + } + + /** + * HttpConnectionParams.setConnectionTimeout + * + * @param timeout + * Timeout in milliseconds + **/ + public void setConnectionTimeout(int timeout) { + HttpClientUtil.setConnectionTimeout(httpClient, timeout); + } + + /** + * Set SoTimeout (read timeout). This is desirable + * for queries, but probably not for indexing. + * + * @param timeout + * Timeout in milliseconds + **/ + public void setSoTimeout(int timeout) { + HttpClientUtil.setSoTimeout(httpClient, timeout); + } + + /** + * Configure whether the client should follow redirects or not. + * <p> + * This defaults to false under the assumption that if you are following a + * redirect to get to a Solr installation, something is misconfigured + * somewhere. + * </p> + */ + public void setFollowRedirects(boolean followRedirects) { + this.followRedirects = followRedirects; + HttpClientUtil.setFollowRedirects(httpClient, followRedirects); + } + + /** + * Allow server->client communication to be compressed. Currently gzip and + * deflate are supported. If the server supports compression the response will + * be compressed. This method is only allowed if the http client is of type + * DefatulHttpClient. + */ + public void setAllowCompression(boolean allowCompression) { + if (httpClient instanceof DefaultHttpClient) { + HttpClientUtil.setAllowCompression((DefaultHttpClient) httpClient, allowCompression); + } else { + throw new UnsupportedOperationException( + "HttpClient instance was not of type DefaultHttpClient"); + } + } + + /** + * Set maximum number of retries to attempt in the event of transient errors. + * <p> + * Maximum number of retries to attempt in the event of transient errors. + * Default: 0 (no) retries. No more than 1 recommended. + * </p> + * @param maxRetries + * No more than 1 recommended + */ + public void setMaxRetries(int maxRetries) { + if (maxRetries > 1) { + log.warn("HttpSolrServer: maximum Retries " + maxRetries + + " > 1. Maximum recommended retries is 1."); + } + this.maxRetries = maxRetries; + } + + public void setRequestWriter(RequestWriter requestWriter) { + this.requestWriter = requestWriter; + } + + /** + * Adds the documents supplied by the given iterator. + * + * @param docIterator + * the iterator which returns SolrInputDocument instances + * + * @return the response from the SolrServer + */ + public UpdateResponse add(Iterator<SolrInputDocument> docIterator) + throws SolrServerException, IOException { + UpdateRequest req = new UpdateRequest(); + req.setDocIterator(docIterator); + return req.process(this); + } + + /** + * Adds the beans supplied by the given iterator. + * + * @param beanIterator + * the iterator which returns Beans + * + * @return the response from the SolrServer + */ + public UpdateResponse addBeans(final Iterator<?> beanIterator) + throws SolrServerException, IOException { + UpdateRequest req = new UpdateRequest(); + req.setDocIterator(new Iterator<SolrInputDocument>() { + + @Override + public boolean hasNext() { + return beanIterator.hasNext(); + } + + @Override + public SolrInputDocument next() { + Object o = beanIterator.next(); + if (o == null) return null; + return getBinder().toSolrInputDocument(o); + } + + @Override + public void remove() { + beanIterator.remove(); + } + }); + return req.process(this); + } + + /** + * Close the {@link ClientConnectionManager} from the internal client. + */ + @Override + public void close() throws IOException { + shutdown(); + } + + @Override + @Deprecated + public void shutdown() { + if (httpClient != null && internalClient) { + HttpClientUtil.close(httpClient); + } + } + + /** + * Set the maximum number of connections that can be open to a single host at + * any given time. If http client was created outside the operation is not + * allowed. + */ + public void setDefaultMaxConnectionsPerHost(int max) { + if (internalClient) { + HttpClientUtil.setMaxConnectionsPerHost(httpClient, max); + } else { + throw new UnsupportedOperationException( + "Client was created outside of HttpSolrServer"); + } + } + + /** + * Set the maximum number of connections that can be open at any given time. + * If http client was created outside the operation is not allowed. + */ + public void setMaxTotalConnections(int max) { + if (internalClient) { + HttpClientUtil.setMaxConnections(httpClient, max); + } else { + throw new UnsupportedOperationException( + "Client was created outside of HttpSolrServer"); + } + } + + public boolean isUseMultiPartPost() { + return useMultiPartPost; + } + + /** + * Set the multipart connection properties + */ + public void setUseMultiPartPost(boolean useMultiPartPost) { + this.useMultiPartPost = useMultiPartPost; + } + + /** + * Subclass of SolrException that allows us to capture an arbitrary HTTP + * status code that may have been returned by the remote server or a + * proxy along the way. + */ + public static class RemoteSolrException extends SolrException { + /** + * @param remoteHost the host the error was received from + * @param code Arbitrary HTTP status code + * @param msg Exception Message + * @param th Throwable to wrap with this Exception + */ + public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) { + super(code, "Error from server at " + remoteHost + ": " + msg, th); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java new file mode 100644 index 0000000..ef5d439 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.ResponseParser; + +/** + * @deprecated Use {@link org.apache.solr.client.solrj.impl.HttpSolrClient} + */ +@Deprecated +public class HttpSolrServer extends HttpSolrClient { + + public HttpSolrServer(String baseURL) { + super(baseURL); + } + + public HttpSolrServer(String baseURL, HttpClient client) { + super(baseURL, client); + } + + public HttpSolrServer(String baseURL, HttpClient client, ResponseParser parser) { + super(baseURL, client, parser); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java new file mode 100644 index 0000000..c524b1f --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java @@ -0,0 +1,730 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.impl; + +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.*; +import org.apache.solr.client.solrj.request.IsUpdateRequest; +import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.apache.solr.common.SolrException; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.ConnectException; +import java.net.MalformedURLException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URL; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.*; + +/** + * LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around + * {@link HttpSolrClient}. This is useful when you + * have multiple Solr servers and the requests need to be Load Balanced among them. + * + * Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the + * correct master; no inter-node routing is done. + * + * In SolrCloud (leader/replica) scenarios, it is usually better to use + * {@link CloudSolrClient}, but this class may be used + * for updates because the server will forward them to the appropriate leader. + * + * <p> + * It offers automatic failover when a server goes down and it detects when the server comes back up. + * <p> + * Load balancing is done using a simple round-robin on the list of servers. + * <p> + * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken + * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server. + * This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds, + * and if not it fails. + * <blockquote><pre> + * SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr"); + * //or if you wish to pass the HttpClient do as follows + * httpClient httpClient = new HttpClient(); + * SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr"); + * </pre></blockquote> + * This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread. + * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute. + * <p> + * <b>When to use this?</b><br> This can be used as a software load balancer when you do not wish to setup an external + * load balancer. Alternatives to this code are to use + * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a + * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a> + * + * @since solr 1.4 + */ +public class LBHttpSolrClient extends SolrClient { + private static Set<Integer> RETRY_CODES = new HashSet<>(4); + + static { + RETRY_CODES.add(404); + RETRY_CODES.add(403); + RETRY_CODES.add(503); + RETRY_CODES.add(500); + } + + // keys to the maps are currently of the form "http://localhost:8983/solr" + // which should be equivalent to HttpSolrServer.getBaseURL() + private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>(); + // access to aliveServers should be synchronized on itself + + protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>(); + + // changes to aliveServers are reflected in this array, no need to synchronize + private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0]; + + + private ScheduledExecutorService aliveCheckExecutor; + + private final HttpClient httpClient; + private final boolean clientIsInternal; + private final AtomicInteger counter = new AtomicInteger(-1); + + private static final SolrQuery solrQuery = new SolrQuery("*:*"); + private volatile ResponseParser parser; + private volatile RequestWriter requestWriter; + + private Set<String> queryParams = new HashSet<>(); + + static { + solrQuery.setRows(0); + /** + * Default sort (if we don't supply a sort) is by score and since + * we request 0 rows any sorting and scoring is not necessary. + * SolrQuery.DOCID schema-independently specifies a non-scoring sort. + * <code>_docid_ asc</code> sort is efficient, + * <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort. + */ + solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc); + // not a top-level request, we are interested only in the server being sent to i.e. it need not distribute our request to further servers + solrQuery.setDistrib(false); + } + + protected static class ServerWrapper { + + final HttpSolrClient client; + + long lastUsed; // last time used for a real request + long lastChecked; // last time checked for liveness + + // "standard" servers are used by default. They normally live in the alive list + // and move to the zombie list when unavailable. When they become available again, + // they move back to the alive list. + boolean standard = true; + + int failedPings = 0; + + public ServerWrapper(HttpSolrClient client) { + this.client = client; + } + + @Override + public String toString() { + return client.getBaseURL(); + } + + public String getKey() { + return client.getBaseURL(); + } + + @Override + public int hashCode() { + return this.getKey().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof ServerWrapper)) return false; + return this.getKey().equals(((ServerWrapper)obj).getKey()); + } + } + + public static class Req { + protected SolrRequest request; + protected List<String> servers; + protected int numDeadServersToTry; + + public Req(SolrRequest request, List<String> servers) { + this.request = request; + this.servers = servers; + this.numDeadServersToTry = servers.size(); + } + + public SolrRequest getRequest() { + return request; + } + public List<String> getServers() { + return servers; + } + + /** @return the number of dead servers to try if there are no live servers left */ + public int getNumDeadServersToTry() { + return numDeadServersToTry; + } + + /** @param numDeadServersToTry The number of dead servers to try if there are no live servers left. + * Defaults to the number of servers in this request. */ + public void setNumDeadServersToTry(int numDeadServersToTry) { + this.numDeadServersToTry = numDeadServersToTry; + } + } + + public static class Rsp { + protected String server; + protected NamedList<Object> rsp; + + /** The response from the server */ + public NamedList<Object> getResponse() { + return rsp; + } + + /** The server that returned the response */ + public String getServer() { + return server; + } + } + + public LBHttpSolrClient(String... solrServerUrls) throws MalformedURLException { + this(null, solrServerUrls); + } + + /** The provided httpClient should use a multi-threaded connection manager */ + public LBHttpSolrClient(HttpClient httpClient, String... solrServerUrl) { + this(httpClient, new BinaryResponseParser(), solrServerUrl); + } + + /** The provided httpClient should use a multi-threaded connection manager */ + public LBHttpSolrClient(HttpClient httpClient, ResponseParser parser, String... solrServerUrl) { + clientIsInternal = (httpClient == null); + this.parser = parser; + if (httpClient == null) { + ModifiableSolrParams params = new ModifiableSolrParams(); + if (solrServerUrl.length > 1) { + // we prefer retrying another server + params.set(HttpClientUtil.PROP_USE_RETRY, false); + } else { + params.set(HttpClientUtil.PROP_USE_RETRY, true); + } + this.httpClient = HttpClientUtil.createClient(params); + } else { + this.httpClient = httpClient; + } + for (String s : solrServerUrl) { + ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s)); + aliveServers.put(wrapper.getKey(), wrapper); + } + updateAliveList(); + } + + public Set<String> getQueryParams() { + return queryParams; + } + + /** + * Expert Method. + * @param queryParams set of param keys to only send via the query string + */ + public void setQueryParams(Set<String> queryParams) { + this.queryParams = queryParams; + } + public void addQueryParams(String queryOnlyParam) { + this.queryParams.add(queryOnlyParam) ; + } + + public static String normalize(String server) { + if (server.endsWith("/")) + server = server.substring(0, server.length() - 1); + return server; + } + + protected HttpSolrClient makeSolrClient(String server) { + HttpSolrClient client = new HttpSolrClient(server, httpClient, parser); + if (requestWriter != null) { + client.setRequestWriter(requestWriter); + } + if (queryParams != null) { + client.setQueryParams(queryParams); + } + return client; + } + + /** + * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped. + * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of + * time, or until a test request on that server succeeds. + * + * Servers are queried in the exact order given (except servers currently in the dead pool are skipped). + * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried. + * Req.getNumDeadServersToTry() controls how many dead servers will be tried. + * + * If no live servers are found a SolrServerException is thrown. + * + * @param req contains both the request as well as the list of servers to query + * + * @return the result of the request + * + * @throws IOException If there is a low-level I/O error. + */ + public Rsp request(Req req) throws SolrServerException, IOException { + Rsp rsp = new Rsp(); + Exception ex = null; + boolean isUpdate = req.request instanceof IsUpdateRequest; + List<ServerWrapper> skipped = null; + + long timeAllowedNano = getTimeAllowedInNanos(req.getRequest()); + long timeOutTime = System.nanoTime() + timeAllowedNano; + for (String serverStr : req.getServers()) { + if(isTimeExceeded(timeAllowedNano, timeOutTime)) { + break; + } + + serverStr = normalize(serverStr); + // if the server is currently a zombie, just skip to the next one + ServerWrapper wrapper = zombieServers.get(serverStr); + if (wrapper != null) { + // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr); + final int numDeadServersToTry = req.getNumDeadServersToTry(); + if (numDeadServersToTry > 0) { + if (skipped == null) { + skipped = new ArrayList<>(numDeadServersToTry); + skipped.add(wrapper); + } + else if (skipped.size() < numDeadServersToTry) { + skipped.add(wrapper); + } + } + continue; + } + rsp.server = serverStr; + HttpSolrClient client = makeSolrClient(serverStr); + + ex = doRequest(client, req, rsp, isUpdate, false, null); + if (ex == null) { + return rsp; // SUCCESS + } + } + + // try the servers we previously skipped + if (skipped != null) { + for (ServerWrapper wrapper : skipped) { + if(isTimeExceeded(timeAllowedNano, timeOutTime)) { + break; + } + + ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey()); + if (ex == null) { + return rsp; // SUCCESS + } + } + } + + + if (ex == null) { + throw new SolrServerException("No live SolrServers available to handle this request"); + } else { + throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex); + } + + } + + protected Exception addZombie(HttpSolrClient server, Exception e) { + + ServerWrapper wrapper; + + wrapper = new ServerWrapper(server); + wrapper.lastUsed = System.currentTimeMillis(); + wrapper.standard = false; + zombieServers.put(wrapper.getKey(), wrapper); + startAliveCheckExecutor(); + return e; + } + + protected Exception doRequest(HttpSolrClient client, Req req, Rsp rsp, boolean isUpdate, + boolean isZombie, String zombieKey) throws SolrServerException, IOException { + Exception ex = null; + try { + rsp.rsp = client.request(req.getRequest()); + if (isZombie) { + zombieServers.remove(zombieKey); + } + } catch (SolrException e) { + // we retry on 404 or 403 or 503 or 500 + // unless it's an update - then we only retry on connect exception + if (!isUpdate && RETRY_CODES.contains(e.code())) { + ex = (!isZombie) ? addZombie(client, e) : e; + } else { + // Server is alive but the request was likely malformed or invalid + if (isZombie) { + zombieServers.remove(zombieKey); + } + throw e; + } + } catch (SocketException e) { + if (!isUpdate || e instanceof ConnectException) { + ex = (!isZombie) ? addZombie(client, e) : e; + } else { + throw e; + } + } catch (SocketTimeoutException e) { + if (!isUpdate) { + ex = (!isZombie) ? addZombie(client, e) : e; + } else { + throw e; + } + } catch (SolrServerException e) { + Throwable rootCause = e.getRootCause(); + if (!isUpdate && rootCause instanceof IOException) { + ex = (!isZombie) ? addZombie(client, e) : e; + } else if (isUpdate && rootCause instanceof ConnectException) { + ex = (!isZombie) ? addZombie(client, e) : e; + } else { + throw e; + } + } catch (Exception e) { + throw new SolrServerException(e); + } + + return ex; + } + + private void updateAliveList() { + synchronized (aliveServers) { + aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]); + } + } + + private ServerWrapper removeFromAlive(String key) { + synchronized (aliveServers) { + ServerWrapper wrapper = aliveServers.remove(key); + if (wrapper != null) + updateAliveList(); + return wrapper; + } + } + + private void addToAlive(ServerWrapper wrapper) { + synchronized (aliveServers) { + ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper); + // TODO: warn if there was a previous entry? + updateAliveList(); + } + } + + public void addSolrServer(String server) throws MalformedURLException { + HttpSolrClient client = makeSolrClient(server); + addToAlive(new ServerWrapper(client)); + } + + public String removeSolrServer(String server) { + try { + server = new URL(server).toExternalForm(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + if (server.endsWith("/")) { + server = server.substring(0, server.length() - 1); + } + + // there is a small race condition here - if the server is in the process of being moved between + // lists, we could fail to remove it. + removeFromAlive(server); + zombieServers.remove(server); + return null; + } + + public void setConnectionTimeout(int timeout) { + HttpClientUtil.setConnectionTimeout(httpClient, timeout); + } + + /** + * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably + * not for indexing. + */ + public void setSoTimeout(int timeout) { + HttpClientUtil.setSoTimeout(httpClient, timeout); + } + + @Override + public void close() { + shutdown(); + } + + @Override + @Deprecated + public void shutdown() { + if (aliveCheckExecutor != null) { + aliveCheckExecutor.shutdownNow(); + } + if(clientIsInternal) { + HttpClientUtil.close(httpClient); + } + } + + /** + * Tries to query a live server. A SolrServerException is thrown if all servers are dead. + * If the request failed due to IOException then the live server is moved to dead pool and the request is + * retried on another live server. After live servers are exhausted, any servers previously marked as dead + * will be tried before failing the request. + * + * @param request the SolrRequest. + * + * @return response + * + * @throws IOException If there is a low-level I/O error. + */ + @Override + public NamedList<Object> request(final SolrRequest request) + throws SolrServerException, IOException { + Exception ex = null; + ServerWrapper[] serverList = aliveServerList; + + int maxTries = serverList.length; + Map<String,ServerWrapper> justFailed = null; + + long timeAllowedNano = getTimeAllowedInNanos(request); + long timeOutTime = System.nanoTime() + timeAllowedNano; + for (int attempts=0; attempts<maxTries; attempts++) { + if(isTimeExceeded(timeAllowedNano, timeOutTime)) { + break; + } + + int count = counter.incrementAndGet() & Integer.MAX_VALUE; + ServerWrapper wrapper = serverList[count % serverList.length]; + wrapper.lastUsed = System.currentTimeMillis(); + + try { + return wrapper.client.request(request); + } catch (SolrException e) { + // Server is alive but the request was malformed or invalid + throw e; + } catch (SolrServerException e) { + if (e.getRootCause() instanceof IOException) { + ex = e; + moveAliveToDead(wrapper); + if (justFailed == null) justFailed = new HashMap<>(); + justFailed.put(wrapper.getKey(), wrapper); + } else { + throw e; + } + } catch (Exception e) { + throw new SolrServerException(e); + } + } + + // try other standard servers that we didn't try just now + for (ServerWrapper wrapper : zombieServers.values()) { + if(isTimeExceeded(timeAllowedNano, timeOutTime)) { + break; + } + + if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue; + try { + NamedList<Object> rsp = wrapper.client.request(request); + // remove from zombie list *before* adding to alive to avoid a race that could lose a server + zombieServers.remove(wrapper.getKey()); + addToAlive(wrapper); + return rsp; + } catch (SolrException e) { + // Server is alive but the request was malformed or invalid + throw e; + } catch (SolrServerException e) { + if (e.getRootCause() instanceof IOException) { + ex = e; + // still dead + } else { + throw e; + } + } catch (Exception e) { + throw new SolrServerException(e); + } + } + + + if (ex == null) { + throw new SolrServerException("No live SolrServers available to handle this request"); + } else { + throw new SolrServerException("No live SolrServers available to handle this request", ex); + } + } + + /** + * @return time allowed in nanos, returns -1 if no time_allowed is specified. + */ + private long getTimeAllowedInNanos(final SolrRequest req) { + SolrParams reqParams = req.getParams(); + return reqParams == null ? -1 : + TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS); + } + + private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) { + return timeAllowedNano > 0 && System.nanoTime() > timeOutTime; + } + + /** + * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for + * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute + * + * @param zombieServer a server in the dead pool + */ + private void checkAZombieServer(ServerWrapper zombieServer) { + long currTime = System.currentTimeMillis(); + try { + zombieServer.lastChecked = currTime; + QueryResponse resp = zombieServer.client.query(solrQuery); + if (resp.getStatus() == 0) { + // server has come back up. + // make sure to remove from zombies before adding to alive to avoid a race condition + // where another thread could mark it down, move it back to zombie, and then we delete + // from zombie and lose it forever. + ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey()); + if (wrapper != null) { + wrapper.failedPings = 0; + if (wrapper.standard) { + addToAlive(wrapper); + } + } else { + // something else already moved the server from zombie to alive + } + } + } catch (Exception e) { + //Expected. The server is still down. + zombieServer.failedPings++; + + // If the server doesn't belong in the standard set belonging to this load balancer + // then simply drop it after a certain number of failed pings. + if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) { + zombieServers.remove(zombieServer.getKey()); + } + } + } + + private void moveAliveToDead(ServerWrapper wrapper) { + wrapper = removeFromAlive(wrapper.getKey()); + if (wrapper == null) + return; // another thread already detected the failure and removed it + zombieServers.put(wrapper.getKey(), wrapper); + startAliveCheckExecutor(); + } + + private int interval = CHECK_INTERVAL; + + /** + * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that + * interval + * + * @param interval time in milliseconds + */ + public void setAliveCheckInterval(int interval) { + if (interval <= 0) { + throw new IllegalArgumentException("Alive check interval must be " + + "positive, specified value = " + interval); + } + this.interval = interval; + } + + private void startAliveCheckExecutor() { + // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor + // if it's not null. + if (aliveCheckExecutor == null) { + synchronized (this) { + if (aliveCheckExecutor == null) { + aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor( + new SolrjNamedThreadFactory("aliveCheckExecutor")); + aliveCheckExecutor.scheduleAtFixedRate( + getAliveCheckRunner(new WeakReference<>(this)), + this.interval, this.interval, TimeUnit.MILLISECONDS); + } + } + } + } + + private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrClient> lbRef) { + return new Runnable() { + @Override + public void run() { + LBHttpSolrClient lb = lbRef.get(); + if (lb != null && lb.zombieServers != null) { + for (ServerWrapper zombieServer : lb.zombieServers.values()) { + lb.checkAZombieServer(zombieServer); + } + } + } + }; + } + + /** + * Return the HttpClient this instance uses. + */ + public HttpClient getHttpClient() { + return httpClient; + } + + public ResponseParser getParser() { + return parser; + } + + /** + * Changes the {@link ResponseParser} that will be used for the internal + * SolrServer objects. + * + * @param parser Default Response Parser chosen to parse the response if the parser + * were not specified as part of the request. + * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser() + */ + public void setParser(ResponseParser parser) { + this.parser = parser; + } + + /** + * Changes the {@link RequestWriter} that will be used for the internal + * SolrServer objects. + * + * @param requestWriter Default RequestWriter, used to encode requests sent to the server. + */ + public void setRequestWriter(RequestWriter requestWriter) { + this.requestWriter = requestWriter; + } + + public RequestWriter getRequestWriter() { + return requestWriter; + } + + @Override + protected void finalize() throws Throwable { + try { + if(this.aliveCheckExecutor!=null) + this.aliveCheckExecutor.shutdownNow(); + } finally { + super.finalize(); + } + } + + // defaults + private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks + private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java new file mode 100644 index 0000000..ee28241 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.ResponseParser; + +import java.net.MalformedURLException; + +/** + * @deprecated Use {@link org.apache.solr.client.solrj.impl.LBHttpSolrClient} + */ +@Deprecated +public class LBHttpSolrServer extends LBHttpSolrClient { + + public LBHttpSolrServer(String... solrServerUrls) throws MalformedURLException { + super(solrServerUrls); + } + + public LBHttpSolrServer(HttpClient httpClient, String... solrServerUrl) { + super(httpClient, solrServerUrl); + } + + public LBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl) { + super(httpClient, parser, solrServerUrl); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/NoOpResponseParser.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/NoOpResponseParser.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/NoOpResponseParser.java new file mode 100644 index 0000000..267dd25 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/NoOpResponseParser.java @@ -0,0 +1,83 @@ +package org.apache.solr.client.solrj.impl; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.commons.io.IOUtils; +import org.apache.solr.client.solrj.ResponseParser; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.NamedList; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringWriter; + +/** + * Simply puts the entire response into an entry in a NamedList. + * This parser isn't parse response into a QueryResponse. + */ +public class NoOpResponseParser extends ResponseParser { + + private String writerType = "xml"; + + public NoOpResponseParser() { + } + + public NoOpResponseParser(String writerType) { + this.writerType = writerType; + } + + @Override + public String getWriterType() { + return writerType; + } + + public void setWriterType(String writerType) { + this.writerType = writerType; + } + + @Override + public NamedList<Object> processResponse(Reader reader) { + try { + StringWriter writer = new StringWriter(); + IOUtils.copy(reader, writer); + String output = writer.toString(); + NamedList<Object> list = new NamedList<>(); + list.add("response", output); + return list; + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "parsing error", e); + } + } + + @Override + public NamedList<Object> processResponse(InputStream body, String encoding) { + try { + StringWriter writer = new StringWriter(); + IOUtils.copy(body, writer, encoding); + String output = writer.toString(); + NamedList<Object> list = new NamedList<>(); + list.add("response", output); + return list; + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "parsing error", e); + } + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java new file mode 100644 index 0000000..14be1f3 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.solr.client.solrj.StreamingResponseCallback; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.DataInputInputStream; +import org.apache.solr.common.util.JavaBinCodec; +import org.apache.solr.common.util.NamedList; + +/** + * A BinaryResponseParser that sends callback events rather then build + * a large response + * + * + * @since solr 4.0 + */ +public class StreamingBinaryResponseParser extends BinaryResponseParser { + final StreamingResponseCallback callback; + + public StreamingBinaryResponseParser( StreamingResponseCallback cb ) + { + this.callback = cb; + } + + @Override + public NamedList<Object> processResponse(InputStream body, String encoding) { + try { + JavaBinCodec codec = new JavaBinCodec() { + + @Override + public SolrDocument readSolrDocument(DataInputInputStream dis) throws IOException { + SolrDocument doc = super.readSolrDocument(dis); + callback.streamSolrDocument( doc ); + return null; + } + + @Override + public SolrDocumentList readSolrDocumentList(DataInputInputStream dis) throws IOException { + SolrDocumentList solrDocs = new SolrDocumentList(); + List list = (List) readVal(dis); + solrDocs.setNumFound((Long) list.get(0)); + solrDocs.setStart((Long) list.get(1)); + solrDocs.setMaxScore((Float) list.get(2)); + + callback.streamDocListInfo( + solrDocs.getNumFound(), + solrDocs.getStart(), + solrDocs.getMaxScore() ); + + // Read the Array + tagByte = dis.readByte(); + if( (tagByte >>> 5) != (ARR >>> 5) ) { + throw new RuntimeException( "doclist must have an array" ); + } + int sz = readSize(dis); + for (int i = 0; i < sz; i++) { + // must be a SolrDocument + readVal( dis ); + } + return solrDocs; + } + }; + + return (NamedList<Object>) codec.unmarshal(body); + } + catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "parsing error", e); + } + } +}
