Raul created HTTPCLIENT-1251:
--------------------------------
Summary: PoolingClientConnectionManager and multithreading problem
Key: HTTPCLIENT-1251
URL: https://issues.apache.org/jira/browse/HTTPCLIENT-1251
Project: HttpComponents HttpClient
Issue Type: Bug
Components: HttpConn
Affects Versions: 4.2.1
Reporter: Raul
I have developed a miltithread application for crawling a remote web page.
Client reused in all threads is initialized in this way
PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
int maxConnections =
crawlerParameters.getIntegerParameter(CrawlerParams.CLIENT_MAX_CONNECTIONS_PARAM,200);
cm.setMaxTotal(maxConnections);
cm.setMaxPerRoute(new HttpRoute(new
HttpHost(baseUriBuilder.getHost())), maxConnections);
cm.setDefaultMaxPerRoute(maxConnections);
httpclient = new DefaultHttpClient(cm);
The request to execute are stored in a queue (thread safe). Then a certain
number of threads (configuration parameter) are launched. The overall execution
finish when all threads finish.
ConcurrentLinkedQueue<QueueElement> queue = CrawlerUtils.buildQueue(baseUrl,
crawlerParameters, page, replacements);
int threadsPerPage =
crawlerParameters.getIntegerParameter(CrawlerParams.CLIENT_THREADS_PER_PAGE_PARAM,
4);
FetcherThread[] threads = new FetcherThread[threadsPerPage];
final CountDownLatch countDownLatch = new
CountDownLatch(threadsPerPage);
for (int i = 0 ; i<threadsPerPage; i++){
threads[i] = new FetcherThread(String.valueOf(i),type,
httpclient, crawlerParameters, queue, page,
xmlMarshaller,crawlBaseDir,countDownLatch);
}
// start the threads
for (int j = 0; j < threads.length; j++) {
threads[j].start();
}
//Wait until all threads finish
countDownLatch.await();
This is the thread detail. HttpClient, request queue and the countdown latch
are passed as arguments in the constructor....
public class FetcherThread extends Thread {
private transient Logger _log =
LoggerFactory.getLogger(FetcherThread.class);
private ConcurrentLinkedQueue<QueueElement> queue;
private CrawlPage page;
private XStreamMarshaller xmlMarshaller;
private String id;
private DefaultHttpClient httpclient;
private String crawlBaseDir;
private String type;
private CountDownLatch countDownLatch;
private CrawlerParams crawlerParameters;
public FetcherThread(final String id, String type, final
DefaultHttpClient httpclient, final CrawlerParams crawlParameters, final
ConcurrentLinkedQueue<QueueElement> queue, final CrawlPage page, final
XStreamMarshaller xmlMarshaller, final String crawlBaseDir, final
CountDownLatch countDownLatch) {
this.queue = queue;
this.page = page;
this.xmlMarshaller = xmlMarshaller;
this.id = id;
this.httpclient = httpclient;
this.crawlBaseDir = crawlBaseDir;
this.type = type;
this.countDownLatch = countDownLatch;
this.crawlerParameters = crawlParameters;
super.setName(this.page.getName()+"_"+id);
}
/**
* Executes the GetMethod and prints some status information.
*/
@Override
public void run() {
try{
if (_log.isInfoEnabled()){
_log.info("Thread request {} started.",
this.getName());
}
while (true) {
QueueElement element = queue.poll();
if (element == null) break;
processElement(element);
}
}finally{
countDownLatch.countDown();
}
}
private void processElement(QueueElement element){
try{
//Process
if (_log.isDebugEnabled()){
_log.debug("Request executor
thread {}, Starting request
{}",this.getName(),element.getRequest().getURI().toString());
}
StopWatch totalwatch = new StopWatch();
StopWatch watch = new StopWatch();
totalwatch.start();
watch.start();
processRequest(element, new
BasicHttpContext(),crawlerParameters);
watch.stop();
long duration =
watch.getLastTaskTimeMillis();
Integer minDurationStr =
crawlerParameters.getIntegerParameter(CrawlerParams.CLIENT_MIN_REQUEST_PROCESS_DURATION_PARAM);
if (minDurationStr!=null){
Long minDuration =
Long.valueOf(minDurationStr);
if (duration<minDuration){
FetcherThread.sleep(minDuration-duration);
}
}
totalwatch.stop();
if (_log.isDebugEnabled()){
_log.debug("Request executor
thread {}, request {} --> performed in "+totalwatch.getLastTaskTimeMillis()+"
millis", id, element.getId());
}
}catch (Exception e){
_log.error("Exception: {}",e);
element.getRequest().abort();
}finally{
element.getRequest().releaseConnection();
}
}
The process method only performs the request execution and reads the content....
HttpResponse response = httpclient.execute(element.getRequest(),httpContext);
String content = readEntity(response);
The problem is that for one request i receive the response from other
request....
The correct way should be
request A ---> response A
request B ---> response B
But when i configure more than one thread polling request from the queue....
request A --> response B
request B ---> ressponse A
If i configure only one fetcher thread, all works well...
I tried with one context per thread, and one context per request, but with the
same wrong result....
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]