On Sat, 2013-01-05 at 15:56 -0800, Ken Krugler wrote:
> On Jan 5, 2013, at 3:31pm, vigna wrote:
> 
> > On 5 Jan 2013, at 3:10 PM, Ken Krugler <[email protected]> wrote:
> > 
> >> So on a large box (e.g. 24 more powerful cores) I could see using upward
> >> of 10K threads being the 
> >> optimal number.
> > 
> > We are working to make 20-30K connections work on 64 cores.
> > 
> >> Just FYI about two years ago we were using big servers with lots of
> >> threads during a large-scale web 
> >> crawl, and we did run into interesting bottlenecks in HttpClient 4.0.1 (?)
> >> with lots of simultaneous 
> >> threads. I haven't had to revisit those issues with a recent release, so
> >> maybe those have been resolved.
> > 
> > 
> > Can you elaborate on that? I guess it would be priceless knowledge :).
> 
> 1. CookieStore access
> 
> > For example, during a Bixo crawl with 300 threads, I was doing regular 
> > thread dumps and inspecting the results. A very high percentage (typically 
> > > 1/3) were blocked while waiting to get access to the cookie store. By 
> > default there's only one of these per HttpClient.
> > 
> > This one was fairly easy to work around, by creating a cookie store in the 
> > local context for each request:
> > 
> >            CookieStore cookieStore = new BasicCookieStore();
> >            localContext.setAttribute(ClientContext.COOKIE_STORE, 
> > cookieStore);
> 
> 2. Scheme registry
> 
> > But I've run into a few other synchronized method/data bottlenecks, which 
> > I'm still working through. For example, at irregular intervals the bulk of 
> > my fetcher threads are blocked on getting the scheme registry
> 
> I believe this one has been fixed via the patch for 
> https://issues.apache.org/jira/browse/HTTPCLIENT-903, and is in the current 
> release of HttpClient.
> 

Ken,

You might want to have a look at the lest code in SVN trunk (to be
released as 4.3). Several classes such as the scheme registry that
previously had to be synchronized in order to ensure thread safety have
been replaced with immutable equivalents. There is also now a way to
create HttpClient in a minimal configuration without authentication,
state management (cookies), proxy support and other non-essential
functions. These functions are not merely disabled but physically
removed from the processing pipeline, which should result in somewhat
better performance in high threads contention scenarios, as the only
synchronization point involved in request execution would be the lock of
the connection pool. Minimal HttpClient may be particularly useful for
anonymous web crawling when authentication and state management are not
required.


> 3. Global lock on connection pool
> 
> Oleg had written:
> 
> > Yes, your observation is correct. The problem is that the connection
> > pool is guarded by a global lock. Naturally if you have 400 threads
> > trying to obtain a connection at about the same time all of them end up
> > contending for one lock. The problem is that I can't think of a
> > different way to ensure the max limits (per route and total) are
> > guaranteed not to be exceeded. If anyone can think of a better algorithm
> > please do let me know. What might be a possibility is creating a more
> > lenient and less prone to lock contention issues implementation that may
> > under stress occasionally allocate a few more connections than the max
> > limits.
> 
> I don't know if this has been resolved. My work-around from a few years ago 
> was to rely on having multiple Hadoop reducers running on the server (each in 
> their own JVM), where I could then limit each JVM to at most 300 connections.
> 

I experimented with the idea of lock-less (unlimited) connection manager
but in my tests it did not perform any better than the standard
connection manager.

I am attaching the source code of my experimental connection manager.
Feel free to improve on it and see if produces better results for your
particular application.

Oleg

> HTH,
> 
> -- Ken
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 
> 
> 
> 

/*
 * ====================================================================
 *
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */

package org.apache.http.impl.conn;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.annotation.ThreadSafe;
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ClientConnectionOperator;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.OperatedClientConnection;
import org.apache.http.impl.conn.DefaultClientConnectionOperator;
import org.apache.http.impl.conn.SchemeRegistryFactory;
import org.apache.http.pool.ConnFactory;
import org.apache.http.pool.ConnPool;
import org.apache.http.pool.PoolEntry;
import org.apache.http.conn.DnsResolver;

/**
 * @since 4.2
 */
@ThreadSafe
public class UnlimitedPoolingClientConnectionManager implements ClientConnectionManager {

    private final Log log = LogFactory.getLog(getClass());

    private final SchemeRegistry schemeRegistry;

    private final HttpUnlimitedConnPool pool;

    private final ClientConnectionOperator operator;

    /** the custom-configured DNS lookup mechanism. */
    private final DnsResolver dnsResolver;

    public UnlimitedPoolingClientConnectionManager(final SchemeRegistry schreg) {
        this(schreg, -1, TimeUnit.MILLISECONDS);
    }

    public UnlimitedPoolingClientConnectionManager(final SchemeRegistry schreg,final DnsResolver dnsResolver) {
        this(schreg, -1, TimeUnit.MILLISECONDS,dnsResolver);
    }

    public UnlimitedPoolingClientConnectionManager() {
        this(SchemeRegistryFactory.createDefault());
    }

    public UnlimitedPoolingClientConnectionManager(
            final SchemeRegistry schemeRegistry,
            final long timeToLive, final TimeUnit tunit) {
        this(schemeRegistry, timeToLive, tunit, new SystemDefaultDnsResolver());
    }

    public UnlimitedPoolingClientConnectionManager(final SchemeRegistry schemeRegistry,
                final long timeToLive, final TimeUnit tunit,
                final DnsResolver dnsResolver) {
        super();
        if (schemeRegistry == null) {
            throw new IllegalArgumentException("Scheme registry may not be null");
        }
        if (dnsResolver == null) {
            throw new IllegalArgumentException("DNS resolver may not be null");
        }
        this.schemeRegistry = schemeRegistry;
        this.dnsResolver  = dnsResolver;
        this.operator = createConnectionOperator(schemeRegistry);
        this.pool = new HttpUnlimitedConnPool(this.log, timeToLive, tunit);
    }

    @Override
    protected void finalize() throws Throwable {
        try {
            shutdown();
        } finally {
            super.finalize();
        }
    }

    /**
     * Hook for creating the connection operator.
     * It is called by the constructor.
     * Derived classes can override this method to change the
     * instantiation of the operator.
     * The default implementation here instantiates
     * {@link DefaultClientConnectionOperator DefaultClientConnectionOperator}.
     *
     * @param schreg    the scheme registry.
     *
     * @return  the connection operator to use
     */
    protected ClientConnectionOperator createConnectionOperator(SchemeRegistry schreg) {
            return new DefaultClientConnectionOperator(schreg, this.dnsResolver);
    }

    public SchemeRegistry getSchemeRegistry() {
        return this.schemeRegistry;
    }

    private String format(final HttpRoute route, final Object state) {
        StringBuilder buf = new StringBuilder();
        buf.append("[route: ").append(route).append("]");
        if (state != null) {
            buf.append("[state: ").append(state).append("]");
        }
        return buf.toString();
    }

    private String format(final HttpPoolEntry entry) {
        StringBuilder buf = new StringBuilder();
        buf.append("[id: ").append(entry.getId()).append("]");
        buf.append("[route: ").append(entry.getRoute()).append("]");
        Object state = entry.getState();
        if (state != null) {
            buf.append("[state: ").append(state).append("]");
        }
        return buf.toString();
    }

    public ClientConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        if (route == null) {
            throw new IllegalArgumentException("HTTP route may not be null");
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection request: " + format(route, state));
        }
        final Future<HttpPoolEntry> future = this.pool.lease(route, state);

        return new ClientConnectionRequest() {

            public void abortRequest() {
                future.cancel(true);
            }

            public ManagedClientConnection getConnection(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }

        };

    }

    ManagedClientConnection leaseConnection(
            final Future<HttpPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException {
        HttpPoolEntry entry;
        try {
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            if (entry.getConnection() == null) {
                throw new IllegalStateException("Pool entry with no connection");
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection leased: " + format(entry));
            }
            return new ManagedClientConnectionImpl(this, this.operator, entry);
        } catch (ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause == null) {
                cause = ex;
            }
            this.log.error("Unexpected exception leasing connection from pool", cause);
            // Should never happen
            throw new InterruptedException();
        } catch (TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection");
        }
    }

    public void releaseConnection(
            final ManagedClientConnection conn, final long keepalive, final TimeUnit tunit) {

        if (!(conn instanceof ManagedClientConnectionImpl)) {
            throw new IllegalArgumentException
                ("Connection class mismatch, " +
                 "connection not obtained from this manager.");
        }
        ManagedClientConnectionImpl managedConn = (ManagedClientConnectionImpl) conn;
        if (managedConn.getManager() != this) {
            throw new IllegalStateException("Connection not obtained from this manager.");
        }

        synchronized (managedConn) {
            HttpPoolEntry entry = managedConn.detach();
            if (entry == null) {
                return;
            }
            try {
                if (managedConn.isOpen() && !managedConn.isMarkedReusable()) {
                    try {
                        managedConn.shutdown();
                    } catch (IOException iox) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("I/O exception shutting down released connection", iox);
                        }
                    }
                }
                entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
                if (this.log.isDebugEnabled()) {
                    String s;
                    if (keepalive > 0) {
                        s = "for " + keepalive + " " + tunit;
                    } else {
                        s = "indefinitely";
                    }
                    this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                }
            } finally {
                this.pool.release(entry, managedConn.isMarkedReusable());
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection released: " + format(entry));
            }
        }
    }

    public void shutdown() {
        this.log.debug("Connection manager is shutting down");
        try {
            this.pool.shutdown();
        } catch (IOException ex) {
            this.log.debug("I/O exception shutting down connection manager", ex);
        }
        this.log.debug("Connection manager shut down");
    }

    public void closeIdleConnections(long idleTimeout, TimeUnit tunit) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
        }
        this.pool.closeIdle(idleTimeout, tunit);
    }

    public void closeExpiredConnections() {
        this.log.debug("Closing expired connections");
        this.pool.closeExpired();
    }

    static abstract class AbstractUnlimitedConnPool<T, C, E extends PoolEntry<T, C>> implements ConnPool<T, E> {

        private final ConnFactory<T, C> connFactory;
        private final Map<T, LinkedList<E>> poolMap;

        private volatile boolean isShutDown;

        public AbstractUnlimitedConnPool(final ConnFactory<T, C> connFactory) {
            super();
            if (connFactory == null) {
                throw new IllegalArgumentException("Connection factory may not null");
            }
            this.connFactory = connFactory;
            this.poolMap = new ConcurrentHashMap<T, LinkedList<E>>();
        }

        /**
         * Creates a new entry for the given connection with the given route.
         */
        protected abstract E createEntry(T route, C conn);

        public boolean isShutdown() {
            return this.isShutDown;
        }

        /**
         * Shuts down the pool.
         */
        public void shutdown() throws IOException {
            if (this.isShutDown) {
                return;
            }
            this.isShutDown = true;
            synchronized (this.poolMap) {
                for (Map.Entry<T, LinkedList<E>> e: this.poolMap.entrySet()) {
                    LinkedList<E> pool = e.getValue();
                    for (;;) {
                        E entry = pool.poll();
                        if (entry != null) {
                            entry.close();
                        } else {
                            break;
                        }
                    }
                }
                this.poolMap.clear();
            }
        }

        private LinkedList<E> getPool(final T route) {
            synchronized (this.poolMap) {
                LinkedList<E> pool = this.poolMap.get(route);
                if (pool == null) {
                    pool = new LinkedList<E>();
                    this.poolMap.put(route, pool);
                }
                return pool;
            }        
        }

        private E getFree(final LinkedList<E> pool, final Object state) {
            if (!pool.isEmpty()) {
                if (state != null) {
                    Iterator<E> it = pool.iterator();
                    while (it.hasNext()) {
                        E entry = it.next();
                        if (state.equals(entry.getState())) {
                            it.remove();
                            return entry;
                        }
                    }
                }
                Iterator<E> it = pool.iterator();
                while (it.hasNext()) {
                    E entry = it.next();
                    if (entry.getState() == null) {
                        it.remove();
                        return entry;
                    }
                }
            }
            return null;
        }
        
        public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
            if (route == null) {
                throw new IllegalArgumentException("Route may not be null");
            }
            LinkedList<E> pool = getPool(route);
            synchronized (pool) {
                if (this.isShutDown) {
                    throw new IllegalStateException("Connection pool shut down");
                }
                E entry;
                for (;;) {
                    entry = getFree(pool, state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                        pool.remove(entry);
                    } else {
                        break;
                    }
                }
                BasicFuture<E> future = new BasicFuture<E>(callback);
                try {
                    if (entry == null) {
                        C conn = this.connFactory.create(route);
                        entry = createEntry(route, conn);
                    }
                    future.completed(entry);
                } catch (IOException ex) {
                    future.failed(ex);
                }
                return future;
            }
        }

        public Future<E> lease(final T route, final Object state) {
            return lease(route, state, null);
        }

        public void release(E entry, boolean reusable) {
            T route = entry.getRoute();
            LinkedList<E> pool = getPool(route);
            synchronized (pool) {
                if (!reusable || this.isShutDown) {
                    entry.close();
                }
                if (!entry.isClosed()) {
                    pool.addFirst(entry);
                }
            }
        }

        /**
         * Closes connections that have been idle longer than the given period
         * of time and evicts them from the pool.
         *
         * @param idletime maximum idle time.
         * @param tunit time unit.
         */
        public void closeIdle(long idletime, final TimeUnit tunit) {
            if (tunit == null) {
                throw new IllegalArgumentException("Time unit must not be null.");
            }
            long time = tunit.toMillis(idletime);
            if (time < 0) {
                time = 0;
            }
            long deadline = System.currentTimeMillis() - time;
            synchronized (this.poolMap) {
                for (Map.Entry<T, LinkedList<E>> e: this.poolMap.entrySet()) {
                    LinkedList<E> pool = e.getValue();
                    for (Iterator<E> it = pool.iterator(); it.hasNext(); ) {
                        E entry = it.next();
                        if (entry.getUpdated() <= deadline) {
                            it.remove();
                            entry.close();
                        }
                    }
                }
            }
        }

        /**
         * Closes expired connections and evicts them from the pool.
         */
        public void closeExpired() {
            long now = System.currentTimeMillis();
            synchronized (this.poolMap) {
                for (Map.Entry<T, LinkedList<E>> e: this.poolMap.entrySet()) {
                    LinkedList<E> pool = e.getValue();
                    for (Iterator<E> it = pool.iterator(); it.hasNext(); ) {
                        E entry = it.next();
                        if (entry.isExpired(now)) {
                            it.remove();
                            entry.close();
                        }
                    }
                }
            }
        }

    }

    private static AtomicLong COUNTER = new AtomicLong();

    static class HttpUnlimitedConnPool extends AbstractUnlimitedConnPool<HttpRoute, OperatedClientConnection, HttpPoolEntry> {

        private final Log log;
        private final long timeToLive;
        private final TimeUnit tunit;

        public HttpUnlimitedConnPool(final Log log,
                final long timeToLive, final TimeUnit tunit) {
            super(new InternalConnFactory());
            this.log = log;
            this.timeToLive = timeToLive;
            this.tunit = tunit;
        }

        @Override
        protected HttpPoolEntry createEntry(final HttpRoute route, final OperatedClientConnection conn) {
            String id = Long.toString(COUNTER.getAndIncrement());
            return new HttpPoolEntry(this.log, id, route, conn, this.timeToLive, this.tunit);
        }

    }
    
    static class InternalConnFactory implements ConnFactory<HttpRoute, OperatedClientConnection> {

        public OperatedClientConnection create(final HttpRoute route) throws IOException {
            return new DefaultClientConnection();
        }

    }

}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to