You might want to try changing the maxTotal and defaultMaxPerRoute settings on the connection pool used by PoolingClientConnectionManager, as the defaults are 20 and 2 respectively, and all of your threads are making requests to the same http route.
Also, I don't think it has anything to do with the behavior you are seeing, but it looks like the executorService you create is never actually used as you start each Worker thread manually? On Oct 15, 2013, at 6:17 AM, Ke Ren <[email protected]> wrote: > yes, that was with http client 4.2.5. I tried to see if there is any > different with 4.3. The following is the code with 4.2.5 > > import concurrent.{ExecutionContext, Await, Future, Promise} > import scala.concurrent.duration._ > import java.util.concurrent.Executors > import org.apache.http.impl.client.{DefaultHttpClient, HttpClients} > import org.apache.http.util.EntityUtils > import org.apache.http.conn.scheme.{PlainSocketFactory, Scheme, > SchemeRegistry} > import org.apache.http.conn.ssl.SSLSocketFactory > import org.apache.http.impl.conn.PoolingClientConnectionManager > import org.apache.http.params.SyncBasicHttpParams > import org.apache.http.client.methods.HttpGet > > object MainTest { > def main(args: Array[String]) { > var num = 100 > var jobs = 1000 > if (args.size == 2) { > num = args(0).toInt > jobs = args(1).toInt > } > val executorService = Executors.newCachedThreadPool() > implicit val ec = ExecutionContext.fromExecutorService(executorService) > val schemeRegistry = new SchemeRegistry(); > schemeRegistry.register(new Scheme("http", 80, > PlainSocketFactory.getSocketFactory())); > schemeRegistry.register(new Scheme("https", 443, > SSLSocketFactory.getSocketFactory())); > val mgr = new PoolingClientConnectionManager(schemeRegistry); > val params = new SyncBasicHttpParams(); > val httpclient = new DefaultHttpClient(mgr, params); > > var workers = List.empty[Worker] > var futures = List.empty[Future[Boolean]] > for (i <- 0 to num) { > val p = Promise[Boolean] > workers = workers.::(new Worker(httpclient, p, jobs)) > futures = futures.::(p.future) > } > > val allF = futures.tail.foldLeft(futures.head) { > case (future, rowFuture) => future.flatMap { > status => > rowFuture.map(finished => status && finished) > } > } > > workers foreach { > worker => > worker.start() > } > > Await.result(allF, 1 hour) > println("finished") > } > } > > class Worker(httpclient: DefaultHttpClient, p: Promise[Boolean], jobs: Int) > extends Thread { > override def run { > for (i <- 0 to jobs) { > try { > var httpget = new HttpGet("xxxx") > val response = httpclient.execute(httpget); > try { > val entity = response.getEntity(); > if (entity != null) { > EntityUtils.toByteArray(entity) > } > } finally { > response.close(); > httpclient.close() > } > } catch { > case e: Throwable => > e.printStackTrace() > } > } > p.success(true) > } > } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
