Updated Branches: refs/heads/MARMOTTA-406 [created] fedcc159a
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fedcc159/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java index afda0df..18b2b44 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java @@ -17,32 +17,65 @@ */ package org.apache.marmotta.platform.core.services.http; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; + import net.sf.ehcache.Ehcache; -import org.apache.http.*; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.ProtocolException; +import org.apache.http.RequestLine; import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpClient; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.ResponseHandler; import org.apache.http.client.cache.CacheResponseStatus; +import org.apache.http.client.cache.HttpCacheContext; import org.apache.http.client.cache.HttpCacheStorage; -import org.apache.http.client.methods.*; -import org.apache.http.client.params.ClientPNames; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.conn.ClientConnectionManager; -import org.apache.http.conn.scheme.PlainSocketFactory; -import org.apache.http.conn.scheme.Scheme; -import org.apache.http.conn.scheme.SchemeRegistry; -import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.cache.CacheConfig; -import org.apache.http.impl.client.cache.CachingHttpClient; +import org.apache.http.impl.client.cache.CachingHttpClientBuilder; import org.apache.http.impl.client.cache.ehcache.EhcacheHttpCacheStorage; -import org.apache.http.impl.conn.PoolingClientConnectionManager; -import org.apache.http.params.*; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.params.HttpParams; import org.apache.http.pool.PoolStats; -import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.apache.marmotta.platform.core.api.config.ConfigurationService; @@ -58,20 +91,6 @@ import org.apache.marmotta.platform.core.services.http.response.StatusCodeRespon import org.apache.marmotta.platform.core.services.http.response.StringBodyResponseHandler; import org.slf4j.Logger; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.event.Observes; -import javax.enterprise.inject.Instance; -import javax.inject.Inject; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** **/ @ApplicationScoped @@ -103,9 +122,9 @@ public class HttpClientServiceImpl implements HttpClientService { @MarmottaCache("http-client-cache") private Instance<Ehcache> ehcache; - private HttpClient httpClient; + private CloseableHttpClient httpClient; private IdleConnectionMonitorThread idleConnectionMonitorThread; - private BasicHttpParams httpParams; +// private BasicHttpParams httpParams; private AtomicLong bytesSent = new AtomicLong(); private AtomicLong bytesReceived = new AtomicLong(); @@ -139,11 +158,11 @@ public class HttpClientServiceImpl implements HttpClientService { } /** - * Get a ready-to-use {@link HttpClient}. + * Get a ready-to-use {@link CloseableHttpClient}. */ @Override - public HttpClient getHttpClient() { - return new ReadLockHttpClient(); + public CloseableHttpClient getHttpClient() { + return new ReadLockHttpClient(httpClient); } /** @@ -274,51 +293,48 @@ public class HttpClientServiceImpl implements HttpClientService { try { lock.writeLock().lock(); - httpParams = new BasicHttpParams(); + final HttpClientBuilder clientBuilder; + if (configurationService.getBooleanConfiguration("core.http.client_cache_enable", true)) { + CacheConfig cacheConfig = CacheConfig.custom() + // FIXME: Hardcoded constants - is this useful? + .setMaxCacheEntries(1000) + .setMaxObjectSize(81920) + .build(); + + final HttpCacheStorage cacheStore = new EhcacheHttpCacheStorage(ehcache.get(), cacheConfig); + clientBuilder = CachingHttpClientBuilder.create() + .setCacheConfig(cacheConfig) + .setHttpCacheStorage(cacheStore); + } else { + clientBuilder = HttpClients.custom(); + } String userAgentString = "Apache Marmotta/" + configurationService.getStringConfiguration("kiwi.version") + " (running at " + configurationService.getServerUri() + ")" + " lmf-core/" + configurationService.getStringConfiguration("kiwi.version"); - userAgentString = configurationService.getStringConfiguration("core.http.user_agent", userAgentString); - - httpParams.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, configurationService.getIntConfiguration("core.http.so_timeout", 60000)); - httpParams.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, - configurationService.getIntConfiguration("core.http.connection_timeout", 10000)); - - httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, true); - httpParams.setIntParameter(ClientPNames.MAX_REDIRECTS, 3); + clientBuilder.setUserAgent(configurationService.getStringConfiguration("core.http.user_agent", userAgentString)); - SchemeRegistry schemeRegistry = new SchemeRegistry(); - schemeRegistry.register(new Scheme("http", 80, PlainSocketFactory.getSocketFactory())); - schemeRegistry.register(new Scheme("https", 443, SSLSocketFactory.getSocketFactory())); + clientBuilder.setDefaultRequestConfig(RequestConfig.custom() + .setSocketTimeout(configurationService.getIntConfiguration("core.http.so_timeout", 60000)) + .setConnectTimeout(configurationService.getIntConfiguration("core.http.connection_timeout", 10000)) + .setRedirectsEnabled(true) + .setMaxRedirects(3) + .build()); - PoolingClientConnectionManager cm = new PoolingClientConnectionManager(schemeRegistry); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); cm.setMaxTotal(configurationService.getIntConfiguration("core.http.max_connections", 20)); cm.setDefaultMaxPerRoute(configurationService.getIntConfiguration("core.http.max_connections_per_route", 10)); + clientBuilder.setConnectionManager(cm); + + clientBuilder.setRedirectStrategy(new LMFRedirectStrategy()); + clientBuilder.setRetryHandler(new LMFHttpRequestRetryHandler()); - final DefaultHttpClient hc = new DefaultHttpClient(cm, httpParams); - hc.setRedirectStrategy(new LMFRedirectStrategy()); - hc.setHttpRequestRetryHandler(new LMFHttpRequestRetryHandler()); - hc.removeRequestInterceptorByClass(org.apache.http.protocol.RequestUserAgent.class); - hc.addRequestInterceptor(new LMFRequestUserAgent(userAgentString)); - - if (configurationService.getBooleanConfiguration("core.http.client_cache_enable", true)) { - CacheConfig cacheConfig = new CacheConfig(); - // FIXME: Hardcoded constants - is this useful? - cacheConfig.setMaxCacheEntries(1000); - cacheConfig.setMaxObjectSize(81920); - - final HttpCacheStorage cacheStore = new EhcacheHttpCacheStorage(ehcache.get(), cacheConfig); - - this.httpClient = new MonitoredHttpClient(new CachingHttpClient(hc, cacheStore, cacheConfig)); - } else { - this.httpClient = new MonitoredHttpClient(hc); - } + this.httpClient = new MonitoredHttpClient(clientBuilder.build()); bytesSent.set(0); bytesReceived.set(0); requestsExecuted.set(0); - idleConnectionMonitorThread = new IdleConnectionMonitorThread(httpClient.getConnectionManager()); + idleConnectionMonitorThread = new IdleConnectionMonitorThread(cm); idleConnectionMonitorThread.start(); StatisticsProvider stats = new StatisticsProvider(cm); @@ -334,41 +350,16 @@ public class HttpClientServiceImpl implements HttpClientService { lock.writeLock().lock(); statisticsService.unregisterModule(HttpClientService.class.getSimpleName()); idleConnectionMonitorThread.shutdown(); - httpClient.getConnectionManager().shutdown(); + try { + httpClient.close(); + } catch (IOException e) { + log.warn("Exception while closing HttpClient: {}", e.getMessage()); + } } finally { lock.writeLock().unlock(); } } - private static class LMFRequestUserAgent implements HttpRequestInterceptor { - - private final String baseUserAgent; - - public LMFRequestUserAgent(String baseUserAgent) { - this.baseUserAgent = baseUserAgent; - } - - private final String buildUserAgentString(String localPart) { - if (localPart == null || localPart.length() == 0) - return baseUserAgent; - else if (localPart.endsWith(baseUserAgent)) return localPart; - return localPart + " " + baseUserAgent; - } - - @Override - public void process(HttpRequest request, HttpContext context) throws HttpException, IOException { - if (request == null) throw new IllegalArgumentException("HTTP request must not be null"); - if (!request.containsHeader(HTTP.USER_AGENT)) { - String useragent = HttpProtocolParams.getUserAgent(request.getParams()); - request.addHeader(HTTP.USER_AGENT, buildUserAgentString(useragent)); - } else { - String useragent = request.getFirstHeader(HTTP.USER_AGENT).getValue(); - request.setHeader(HTTP.USER_AGENT, buildUserAgentString(useragent)); - } - } - - } - private static class LMFRedirectStrategy extends DefaultRedirectStrategy { @Override public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException { @@ -420,10 +411,10 @@ public class HttpClientServiceImpl implements HttpClientService { private static class IdleConnectionMonitorThread extends Thread { - private final ClientConnectionManager connMgr; + private final HttpClientConnectionManager connMgr; private volatile boolean shutdown; - public IdleConnectionMonitorThread(ClientConnectionManager connMgr) { + public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) { super("HttpClientService Idle Connection Manager"); this.connMgr = connMgr; setDaemon(true); @@ -459,9 +450,9 @@ public class HttpClientServiceImpl implements HttpClientService { private class StatisticsProvider implements StatisticsModule { private boolean enabled; - private PoolingClientConnectionManager connectionManager; + private PoolingHttpClientConnectionManager connectionManager; - public StatisticsProvider(PoolingClientConnectionManager cm) { + public StatisticsProvider(PoolingHttpClientConnectionManager cm) { this.connectionManager = cm; enabled = true; } @@ -519,169 +510,54 @@ public class HttpClientServiceImpl implements HttpClientService { } - private class ReadLockHttpClient implements HttpClient { + private class ReadLockHttpClient extends CloseableHttpClient { - private HttpParams params; + private final CloseableHttpClient delegate; - public ReadLockHttpClient() { - this.params = new DefaultedHttpParams(new BasicHttpParams(), httpParams); + public ReadLockHttpClient(CloseableHttpClient delegate) { + this.delegate = delegate; } @Override + @Deprecated public HttpParams getParams() { - return params; + return delegate.getParams(); } @Override + @Deprecated public ClientConnectionManager getConnectionManager() { - return httpClient.getConnectionManager(); - } - - @Override - public HttpResponse execute(HttpUriRequest request) throws IOException, ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(request); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public HttpResponse execute(HttpUriRequest request, HttpContext context) throws IOException, ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(request, context); - } finally { - lock.readLock().unlock(); - } + return delegate.getConnectionManager(); } @Override - public HttpResponse execute(HttpHost target, HttpRequest request) throws IOException, ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(target, request); - } finally { - lock.readLock().unlock(); - } + public void close() throws IOException { + // Nop; this Client is read only! } @Override - public HttpResponse execute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException { + protected CloseableHttpResponse doExecute(HttpHost target, + HttpRequest request, HttpContext context) throws IOException, + ClientProtocolException { lock.readLock().lock(); try { - return httpClient.execute(target, request, context); + return delegate.execute(target, request, context); } finally { - lock.readLock().unlock(); + lock.readLock().lock(); } } - @Override - public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler) throws IOException, ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(request, responseHandler); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) throws IOException, - ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(request, responseHandler, context); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public <T> T execute(HttpHost target, HttpRequest request, ResponseHandler<? extends T> responseHandler) throws IOException, - ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(target, request, responseHandler); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public <T> T execute(HttpHost target, HttpRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) - throws IOException, ClientProtocolException { - lock.readLock().lock(); - try { - return httpClient.execute(target, request, responseHandler, context); - } finally { - lock.readLock().unlock(); - } - } } - protected class MonitoredHttpClient implements HttpClient { + protected class MonitoredHttpClient extends CloseableHttpClient { - private final HttpClient delegate; + private final CloseableHttpClient delegate; - public MonitoredHttpClient(HttpClient delegate) { + public MonitoredHttpClient(CloseableHttpClient delegate) { this.delegate = delegate; } - @Override - public HttpParams getParams() { - return delegate.getParams(); - } - - @Override - public ClientConnectionManager getConnectionManager() { - return delegate.getConnectionManager(); - } - - @Override - public HttpResponse execute(HttpUriRequest request) throws IOException, ClientProtocolException { - final Task task = preProcess(request); - - final HttpResponse response; - lock.readLock().lock(); - try { - response = delegate.execute(request); - } catch (ClientProtocolException cpe) { - task.endTask(); - throw cpe; - } catch (IOException io) { - task.endTask(); - throw io; - } finally { - lock.readLock().unlock(); - } - - return postProcess(response, null, task); - } - - @Override - public HttpResponse execute(HttpUriRequest request, HttpContext context) throws IOException, ClientProtocolException { - final Task task = preProcess(request); - - final HttpResponse response; - lock.readLock().lock(); - try { - response = delegate.execute(request, context); - } catch (ClientProtocolException cpe) { - task.endTask(); - throw cpe; - } catch (IOException io) { - task.endTask(); - throw io; - } finally { - lock.readLock().unlock(); - } - - return postProcess(response, context, task); - } - private Task preProcess(HttpRequest request) { final RequestLine rl = request.getRequestLine(); final String taskName = String.format("%S %s %S", rl.getMethod(), rl.getUri(), request.getProtocolVersion()); @@ -702,14 +578,15 @@ public class HttpClientServiceImpl implements HttpClientService { return task; } - private HttpResponse postProcess(final HttpResponse response, HttpContext context, final Task task) { + private CloseableHttpResponse postProcess(final CloseableHttpResponse response, HttpContext context, final Task task) { requestsExecuted.incrementAndGet(); task.resetProgress(); task.updateMessage("retrieving response"); if (response.getEntity() != null) { boolean cachedResponse; - if (context != null) { - CacheResponseStatus cacheRespStatus = (CacheResponseStatus) context.getAttribute(CachingHttpClient.CACHE_RESPONSE_STATUS); + if (context != null && context instanceof HttpCacheContext) { + HttpCacheContext cacheContext = (HttpCacheContext) context; + CacheResponseStatus cacheRespStatus = cacheContext.getCacheResponseStatus(); // To report download progress, the entity is wrapped in a MonitoredHttpEntity. cachedResponse = cacheRespStatus != null && cacheRespStatus != CacheResponseStatus.CACHE_MISS; } else { @@ -723,34 +600,29 @@ public class HttpClientServiceImpl implements HttpClientService { } @Override - public HttpResponse execute(HttpHost target, HttpRequest request) throws IOException, ClientProtocolException { - final Task task = preProcess(request); + @Deprecated + public HttpParams getParams() { + return delegate.getParams(); + } - final HttpResponse response; - lock.readLock().lock(); - try { - response = delegate.execute(target, request); - } catch (ClientProtocolException cpe) { - task.endTask(); - throw cpe; - } catch (IOException io) { - task.endTask(); - throw io; - } finally { - lock.readLock().unlock(); - } + @Override + @Deprecated + public ClientConnectionManager getConnectionManager() { + return delegate.getConnectionManager(); + } - return postProcess(response, null, task); + @Override + public void close() throws IOException { + delegate.close(); } @Override - public HttpResponse execute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException { + protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException { final Task task = preProcess(request); - final HttpResponse response; lock.readLock().lock(); try { - response = delegate.execute(target, request, context); + return postProcess(delegate.execute(target, request, context), context, task); } catch (ClientProtocolException cpe) { task.endTask(); throw cpe; @@ -760,49 +632,6 @@ public class HttpClientServiceImpl implements HttpClientService { } finally { lock.readLock().unlock(); } - - return postProcess(response, context, task); - } - - @Override - public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler) throws IOException, ClientProtocolException { - final HttpResponse response = execute(request); - - return processResponse(responseHandler, response); - } - - private <T> T processResponse(ResponseHandler<? extends T> responseHandler, final HttpResponse response) throws ClientProtocolException, - IOException { - try { - return responseHandler.handleResponse(response); - } finally { - // Make sure everything is cleaned up properly - EntityUtils.consume(response.getEntity()); - } - } - - @Override - public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) throws IOException, - ClientProtocolException { - final HttpResponse response = execute(request, context); - - return processResponse(responseHandler, response); - } - - @Override - public <T> T execute(HttpHost target, HttpRequest request, ResponseHandler<? extends T> responseHandler) throws IOException, - ClientProtocolException { - final HttpResponse response = execute(target, request); - - return processResponse(responseHandler, response); - } - - @Override - public <T> T execute(HttpHost target, HttpRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) - throws IOException, ClientProtocolException { - final HttpResponse response = execute(target, request, context); - - return processResponse(responseHandler, response); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/fedcc159/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/prefix/PrefixCC.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/prefix/PrefixCC.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/prefix/PrefixCC.java index b231800..79272f8 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/prefix/PrefixCC.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/prefix/PrefixCC.java @@ -101,6 +101,7 @@ public class PrefixCC implements PrefixProvider { @Override public String handleResponse(HttpResponse response) throws ClientProtocolException, IOException { + log.error("StatusCode: {}", response.getStatusLine().getStatusCode()); if (200 == response.getStatusLine().getStatusCode()) { HttpEntity entity = response.getEntity(); @@ -108,6 +109,7 @@ public class PrefixCC implements PrefixProvider { try { while (it.hasNext()) { final String l = it.next(); + log.error(": {}", l); if (l.endsWith("\t" + namespace)) { return l.substring(0, l.indexOf("\t")); }
