http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java new file mode 100644 index 0000000..2887a50 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java @@ -0,0 +1,337 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.base.Supplier; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stats; +import com.twitter.common.stats.StatsProvider; + +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and + * connection choice to a supplied strategy. + * + * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in + * use. + * + * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command + * + * @author John Sirois + */ +public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> { + + private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName()); + + private final Set<S> leasedConnections = + Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap()); + private final Set<S> availableConnections = Sets.newHashSet(); + private final Lock poolLock; + private final Condition available; + + private final ConnectionFactory<S> connectionFactory; + private final Executor executor; + + private volatile boolean closed; + private final AtomicLong connectionsCreated; + private final AtomicLong connectionsDestroyed; + private final AtomicLong connectionsReturned; + + /** + * Creates a connection pool with a connection picker that selects the first item in the set of + * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}. + * + * @param connectionFactory Factory to create and destroy connections. + */ + public ConnectionPool(ConnectionFactory<S> connectionFactory) { + this(connectionFactory, Stats.STATS_PROVIDER); + } + + /** + * Creates a connection pool with a connection picker that selects the first item in the set of + * available connections and uses the supplied StatsProvider to register stats with. + * + * @param connectionFactory Factory to create and destroy connections. + * @param statsProvider Stats export provider. + */ + public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) { + this(Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("CP-" + connectionFactory + "[%d]") + .setDaemon(true) + .build()), + new ReentrantLock(true), connectionFactory, statsProvider); + } + + @VisibleForTesting + ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory, + StatsProvider statsProvider) { + Preconditions.checkNotNull(executor); + Preconditions.checkNotNull(poolLock); + Preconditions.checkNotNull(connectionFactory); + Preconditions.checkNotNull(statsProvider); + + this.executor = executor; + this.poolLock = poolLock; + available = poolLock.newCondition(); + this.connectionFactory = connectionFactory; + + String cfName = Stats.normalizeName(connectionFactory.toString()); + statsProvider.makeGauge("cp_leased_connections_" + cfName, + new Supplier<Integer>() { + @Override public Integer get() { + return leasedConnections.size(); + } + }); + statsProvider.makeGauge("cp_available_connections_" + cfName, + new Supplier<Integer>() { + @Override public Integer get() { + return availableConnections.size(); + } + }); + this.connectionsCreated = + statsProvider.makeCounter("cp_created_connections_" + cfName); + this.connectionsDestroyed = + statsProvider.makeCounter("cp_destroyed_connections_" + cfName); + this.connectionsReturned = + statsProvider.makeCounter("cp_returned_connections_" + cfName); + } + + @Override + public String toString() { + return "CP-" + connectionFactory; + } + + @Override + public S get() throws ResourceExhaustedException, TimeoutException { + checkNotClosed(); + poolLock.lock(); + try { + return leaseConnection(NO_TIMEOUT); + } finally { + poolLock.unlock(); + } + } + + @Override + public S get(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + + checkNotClosed(); + Preconditions.checkNotNull(timeout); + if (timeout.getValue() == 0) { + return get(); + } + + try { + long start = System.nanoTime(); + long timeBudgetNs = timeout.as(Time.NANOSECONDS); + if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) { + try { + timeBudgetNs -= (System.nanoTime() - start); + return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS)); + } finally { + poolLock.unlock(); + } + } else { + throw new TimeoutException("Timed out waiting for pool lock"); + } + } catch (InterruptedException e) { + throw new TimeoutException("Interrupted waiting for pool lock"); + } + } + + private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException, + TimeoutException { + S connection = getConnection(timeout); + if (connection == null) { + throw new ResourceExhaustedException("Connection pool resources exhausted"); + } + return leaseConnection(connection); + } + + @Override + public void release(S connection) { + release(connection, false); + } + + /** + * Equivalent to releasing a Connection with isValid() == false. + * @see ObjectPool#remove(Object) + */ + @Override + public void remove(S connection) { + release(connection, true); + } + + // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create + // connection - reason about this and potentially submit release to our executor + private void release(S connection, boolean remove) { + poolLock.lock(); + try { + if (!leasedConnections.remove(connection)) { + throw new IllegalArgumentException("Connection not controlled by this connection pool: " + + connection); + } + + if (!closed && !remove && connection.isValid()) { + addConnection(connection); + connectionsReturned.incrementAndGet(); + } else { + connectionFactory.destroy(connection); + connectionsDestroyed.incrementAndGet(); + } + } finally { + poolLock.unlock(); + } + } + + @Override + public void close() { + poolLock.lock(); + try { + for (S availableConnection : availableConnections) { + connectionFactory.destroy(availableConnection); + } + } finally { + closed = true; + poolLock.unlock(); + } + } + + private void checkNotClosed() { + Preconditions.checkState(!closed); + } + + private S leaseConnection(S connection) { + leasedConnections.add(connection); + return connection; + } + + // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be + // fixed but there may be no need - do gedankanalysis + private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException, + TimeoutException { + if (availableConnections.isEmpty()) { + if (leasedConnections.isEmpty()) { + // Completely empty pool + try { + return createConnection(timeout); + } catch (Exception e) { + throw new ResourceExhaustedException("failed to create a new connection", e); + } + } else { + // If the pool is allowed to grow - let the connection factory race a release + if (connectionFactory.mightCreate()) { + executor.execute(new Runnable() { + @Override public void run() { + try { + // The connection timeout is not needed here to honor the callers get requested + // timeout, but we don't want to have an infinite timeout which could exhaust a + // thread pool over many backgrounded create calls + S connection = createConnection(timeout); + if (connection != null) { + addConnection(connection); + } else { + LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " + + "due to maximum pool size or timeout"); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e); + } + } + }); + } + + try { + // We wait for a returned/new connection here in loops to guard against the + // "spurious wakeups" that are documented can occur with Condition.await() + if (timeout.getValue() == 0) { + while(availableConnections.isEmpty()) { + available.await(); + } + } else { + long timeRemainingNs = timeout.as(Time.NANOSECONDS); + while(availableConnections.isEmpty()) { + long start = System.nanoTime(); + if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) { + throw new TimeoutException( + "timeout waiting for a connection to be released to the pool"); + } else { + timeRemainingNs -= (System.nanoTime() - start); + } + } + if (availableConnections.isEmpty()) { + throw new TimeoutException( + "timeout waiting for a connection to be released to the pool"); + } + } + } catch (InterruptedException e) { + throw new TimeoutException("Interrupted while waiting for a connection."); + } + } + } + + return getAvailableConnection(); + } + + private S getAvailableConnection() { + S connection = (availableConnections.size() == 1) + ? Iterables.getOnlyElement(availableConnections) + : availableConnections.iterator().next(); + if (!availableConnections.remove(connection)) { + throw new IllegalArgumentException("Connection picked not in pool: " + connection); + } + return connection; + } + + private S createConnection(Amount<Long, Time> timeout) throws Exception { + S connection = connectionFactory.create(timeout); + if (connection != null) { + connectionsCreated.incrementAndGet(); + } + return connection; + } + + private void addConnection(S connection) { + poolLock.lock(); + try { + availableConnections.add(connection); + available.signal(); + } finally { + poolLock.unlock(); + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java new file mode 100644 index 0000000..4c20604 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java @@ -0,0 +1,79 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.base.Command; + +/** + * A host set that can be monitored for changes. + * + * @param <T> The type that is used to identify members of the host set. + */ +public interface DynamicHostSet<T> { + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @throws MonitorException if there is a problem monitoring the host set + * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)} + */ + @Deprecated + public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException; + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @return A command which, when executed, will stop monitoring the host set. + * @throws MonitorException if there is a problem monitoring the host set + */ + public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException; + + /** + * An interface to an object that is interested in receiving notification whenever the host set + * changes. + */ + public static interface HostChangeMonitor<T> { + + /** + * Called when either the available set of services changes (when a service dies or a new + * instance comes on-line) or when an existing service advertises a status or health change. + * + * @param hostSet the current set of available ServiceInstances + */ + void onChange(ImmutableSet<T> hostSet); + } + + public static class MonitorException extends Exception { + public MonitorException(String msg) { + super(msg); + } + + public MonitorException(String msg, Throwable cause) { + super(msg, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java new file mode 100644 index 0000000..e9cc0f0 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java @@ -0,0 +1,52 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.base.Command; + +import static com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor; +import static com.twitter.common.net.pool.DynamicHostSet.MonitorException; + +/** + * Utility methods for dealing with dynamic sets of hosts. + */ +public final class DynamicHostSetUtil { + + /** + * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of + * the underlying actual endpoints. + * + * @param hostSet The hostSet to snapshot. + * @throws MonitorException if there was a problem obtaining the snapshot. + */ + public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws MonitorException { + final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder(); + Command unwatch = hostSet.watch(new HostChangeMonitor<T>() { + @Override public void onChange(ImmutableSet<T> hostSet) { + snapshot.addAll(hostSet); + } + }); + unwatch.execute(); + return snapshot.build(); + } + + private DynamicHostSetUtil() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java new file mode 100644 index 0000000..dc9aa21 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java @@ -0,0 +1,172 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.twitter.common.base.Closure; +import com.twitter.common.net.loadbalancing.LoadBalancer; +import com.twitter.common.net.pool.DynamicHostSet.MonitorException; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +/** + * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a + * {@link com.twitter.common.zookeeper.ServerSet}. + * + * @param <H> The type that contains metadata information about hosts, such as liveness and address. + * @param <T> The raw connection type that is being pooled. + * @param <E> The type that identifies the endpoint of the pool, such as an address. + * @author John Sirois + */ +public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> { + + private final MetaPool<T, E> pool; + + /** + * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools + * for the given {@code serverSet}. + * + * @param hostSet the dynamic set of available servers to pool connections for + * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint + * @param loadBalancer Load balancer to manage request flow. + * @param onBackendsChosen A callback to notify of chosen backends. + * @param restoreInterval the interval after connection errors start occurring for a target to + * begin checking to see if it has come back to a healthy state + * @param endpointExtractor Function that transforms a service instance into an endpoint instance. + * @param livenessChecker Filter that will determine whether a host indicates itself as available. + * @throws MonitorException if there is a problem monitoring the host set + */ + public DynamicPool(DynamicHostSet<H> hostSet, + Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory, + LoadBalancer<E> loadBalancer, + Closure<Collection<E>> onBackendsChosen, + Amount<Long, Time> restoreInterval, + Function<H, E> endpointExtractor, + Predicate<H> livenessChecker) + throws DynamicHostSet.MonitorException { + Preconditions.checkNotNull(hostSet); + Preconditions.checkNotNull(endpointPoolFactory); + + pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval); + + // TODO(John Sirois): consider an explicit start/stop + hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor, + livenessChecker) { + @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools, + Map<E, ObjectPool<Connection<T, E>>> livePools) { + poolRebuilt(deadPools, livePools); + } + }); + } + + @VisibleForTesting + void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools, + Map<E, ObjectPool<Connection<T, E>>> livePools) { + + pool.setBackends(livePools); + + for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) { + deadTargetPool.close(); + } + } + + @Override + public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException { + return pool.get(); + } + + @Override + public Connection<T, E> get(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + return pool.get(timeout); + } + + @Override + public void release(Connection<T, E> connection) { + pool.release(connection); + } + + @Override + public void remove(Connection<T, E> connection) { + pool.remove(connection); + } + + @Override + public void close() { + pool.close(); + } + + private abstract class PoolMonitor<H, S extends Connection<?, ?>> + implements DynamicHostSet.HostChangeMonitor<H> { + + private final Function<E, ObjectPool<S>> endpointPoolFactory; + private final Function<H, E> endpointExtractor; + private final Predicate<H> livenessTest; + + public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory, + Function<H, E> endpointExtractor, + Predicate<H> livenessTest) { + this.endpointPoolFactory = endpointPoolFactory; + this.endpointExtractor = endpointExtractor; + this.livenessTest = livenessTest; + } + + private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap(); + + @Override + public synchronized void onChange(ImmutableSet<H> serverSet) { + // TODO(John Sirois): change onChange to pass the delta data since its already computed by + // ServerSet + + Map<E, H> newEndpoints = + Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor); + + Set<E> deadEndpoints = ImmutableSet.copyOf( + Sets.difference(endpointPools.keySet(), newEndpoints.keySet())); + Set<ObjectPool<S>> deadPools = Sets.newHashSet(); + for (E endpoint : deadEndpoints) { + ObjectPool<S> deadPool = endpointPools.remove(endpoint); + deadPools.add(deadPool); + } + + Set<E> addedEndpoints = ImmutableSet.copyOf( + Sets.difference(newEndpoints.keySet(), endpointPools.keySet())); + for (E endpoint : addedEndpoints) { + ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint); + endpointPools.put(endpoint, endpointPool); + } + + onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools)); + } + + protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools, + Map<E, ObjectPool<S>> livePools); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java b/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java new file mode 100644 index 0000000..fb97632 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java @@ -0,0 +1,343 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import com.twitter.common.base.Closure; +import com.twitter.common.base.Command; +import com.twitter.common.net.loadbalancing.LoadBalancer; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * A connection pool that picks connections from a set of backend pools. Backend pools are selected + * from randomly initially but then as they are used they are ranked according to how many + * connections they have available and whether or not the last used connection had an error or not. + * In this way, backends that are responsive should get selected in preference to those that are + * not. + * + * <p>Non-responsive backends are monitored after a configurable period in a background thread and + * if a connection can be obtained they start to float back up in the rankings. In this way, + * backends that are initially non-responsive but later become responsive should end up getting + * selected. + * + * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command + * + * @author John Sirois + */ +public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> { + + private final Command stopBackendRestorer; + + private Map<E, ObjectPool<Connection<T, E>>> backends = null; + + // Locks to guard mutation of the backends set. + private final Lock backendsReadLock; + private final Lock backendsWriteLock; + + private final Closure<Collection<E>> onBackendsChosen; + + private final LoadBalancer<E> loadBalancer; + + /** + * Creates a connection pool with no backends. Backends may be added post-creation by calling + * {@link #setBackends(java.util.Map)} + * + * @param loadBalancer the load balancer to distribute requests among backends. + * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new + * set of backends to restrict its call distribution to + * @param restoreInterval the interval after a backend goes dead to begin checking the backend to + * see if it has come back to a healthy state + */ + public MetaPool(LoadBalancer<E> loadBalancer, + Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) { + this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer, + onBackendsChosen, restoreInterval); + } + + /** + * Creates a connection pool that balances connections across multiple backend pools. + * + * @param backends the connection pools for the backends + * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new + * set of backends to restrict its call distribution to + * @param loadBalancer the load balancer to distribute requests among backends. + * @param restoreInterval the interval after a backend goes dead to begin checking the backend to + * see if it has come back to a healthy state + */ + public MetaPool( + ImmutableMap<E, ObjectPool<Connection<T, E>>> backends, + LoadBalancer<E> loadBalancer, + Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) { + + this.loadBalancer = Preconditions.checkNotNull(loadBalancer); + this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen); + + ReadWriteLock backendsLock = new ReentrantReadWriteLock(true); + backendsReadLock = backendsLock.readLock(); + backendsWriteLock = backendsLock.writeLock(); + + setBackends(backends); + + Preconditions.checkNotNull(restoreInterval); + Preconditions.checkArgument(restoreInterval.getValue() > 0); + stopBackendRestorer = startDeadBackendRestorer(restoreInterval); + } + + /** + * Assigns the backend pools that this pool should draw from. + * + * @param pools New pools to use. + */ + public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) { + backendsWriteLock.lock(); + try { + backends = Preconditions.checkNotNull(pools); + loadBalancer.offerBackends(pools.keySet(), onBackendsChosen); + } finally { + backendsWriteLock.unlock(); + } + } + + private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) { + + final AtomicBoolean shouldRestore = new AtomicBoolean(true); + Runnable restoreDeadBackends = new Runnable() { + @Override public void run() { + if (shouldRestore.get()) { + restoreDeadBackends(restoreInterval); + } + } + }; + final ScheduledExecutorService scheduledExecutorService = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("MTCP-DeadBackendRestorer[%s]") + .build()); + long restoreDelay = restoreInterval.getValue(); + scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay, + restoreDelay, restoreInterval.getUnit().getTimeUnit()); + + return new Command() { + @Override public void execute() { + shouldRestore.set(false); + scheduledExecutorService.shutdownNow(); + LOG.info("Backend restorer shut down"); + } + }; + } + + private static final Logger LOG = Logger.getLogger(MetaPool.class.getName()); + + private void restoreDeadBackends(Amount<Long, Time> restoreInterval) { + for (E backend : snapshotBackends()) { + ObjectPool<Connection<T, E>> pool; + backendsReadLock.lock(); + try { + pool = backends.get(backend); + } finally { + backendsReadLock.unlock(); + } + + // We can lose a race if the backends change - and that's fine, we'll restore the new set of + // backends in the next scheduled restoration run. + if (pool != null) { + try { + release(get(backend, pool, restoreInterval)); + } catch (TimeoutException e) { + LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); + } catch (ResourceExhaustedException e) { + LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); + } + } + } + } + + private Iterable<E> snapshotBackends() { + backendsReadLock.lock(); + try { + return ImmutableList.copyOf(backends.keySet()); + } finally { + backendsReadLock.unlock(); + } + } + + @Override + public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException { + return get(ObjectPool.NO_TIMEOUT); + } + + @Override + public Connection<T, E> get(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + + E backend; + ObjectPool<Connection<T, E>> pool; + + backendsReadLock.lock(); + try { + backend = loadBalancer.nextBackend(); + Preconditions.checkNotNull(backend, "Load balancer gave a null backend."); + + pool = backends.get(backend); + Preconditions.checkNotNull(backend, + "Given backend %s not found in tracked backends: %s", backend, backends); + } finally { + backendsReadLock.unlock(); + } + + return get(backend, pool, timeout); + } + + private static class ManagedConnection<T, E> implements Connection<T, E> { + private final Connection<T, E> connection; + private final ObjectPool<Connection<T, E>> pool; + + private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) { + this.connection = connection; + this.pool = pool; + } + + @Override + public void close() { + connection.close(); + } + + @Override + public T get() { + return connection.get(); + } + + @Override + public boolean isValid() { + return connection.isValid(); + } + + @Override + public E getEndpoint() { + return connection.getEndpoint(); + } + + @Override public String toString() { + return "ManagedConnection[" + connection.toString() + "]"; + } + + void release(boolean remove) { + if (remove) { + pool.remove(connection); + } else { + pool.release(connection); + } + } + } + + private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool, + Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException { + + long startNanos = System.nanoTime(); + try { + Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout); + + // BEWARE: We have leased a connection from the underlying pool here and must return it to the + // caller so they can later release it. If we fail to do so, the connection will leak. + // Catching intermediate exceptions ourselves and pro-actively returning the connection to the + // pool before re-throwing is not a viable option since the return would have to succeed, + // forcing us to ignore the timeout passed in. + + // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire + // a (backend, pool) pair atomically that has since been removed, we can safely let the lb + // know about backend events and it will just ignore us. + + try { + loadBalancer.connected(backend, System.nanoTime() - startNanos); + } catch (RuntimeException e) { + LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after " + + "leasing a connection - continuing", e); + } + return new ManagedConnection<T, E>(connection, pool); + } catch (TimeoutException e) { + loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT); + throw e; + } catch (ResourceExhaustedException e) { + loadBalancer.connectFailed(backend, ConnectionResult.FAILED); + throw e; + } + } + + @Override + public void release(Connection<T, E> connection) { + release(connection, false); + } + + /** + * Equivalent to releasing a Connection with isValid() == false. + * @see ObjectPool#remove(Object) + */ + @Override + public void remove(Connection<T, E> connection) { + release(connection, true); + } + + private void release(Connection<T, E> connection, boolean remove) { + backendsWriteLock.lock(); + try { + + if (!(connection instanceof ManagedConnection)) { + throw new IllegalArgumentException("Connection not controlled by this connection pool: " + + connection); + } + ((ManagedConnection) connection).release(remove); + + loadBalancer.released(connection.getEndpoint()); + } finally { + backendsWriteLock.unlock(); + } + } + + @Override + public void close() { + stopBackendRestorer.execute(); + + backendsWriteLock.lock(); + try { + for (ObjectPool<Connection<T, E>> backend : backends.values()) { + backend.close(); + } + } finally { + backendsWriteLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java b/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java new file mode 100644 index 0000000..63bf788 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java @@ -0,0 +1,85 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import java.util.concurrent.TimeoutException; + +/** + * A generic object pool that provides object of a given type for exclusive use by the caller. + * Object pools generally pool expensive resources and so offer a {@link #close} method that should + * be used to free these resources when the pool is no longer needed. + * + * @author John Sirois + */ +public interface ObjectPool<T> { + + /** + * Gets a resource potentially blocking for as long as it takes to either create a new one or wait + * for one to be {@link #release(Object) released}. Callers must {@link #release(Object) release} + * the connection when they are done with it. + * + * @return a resource for exclusive use by the caller + * @throws ResourceExhaustedException if no resource could be obtained because this pool was + * exhausted + * @throws TimeoutException if we timed out while trying to fetch a resource + */ + T get() throws ResourceExhaustedException, TimeoutException; + + /** + * A convenience constant representing a no timeout. + */ + Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS); + + /** + * Gets a resource; timing out if there are none available and it takes longer than specified to + * create a new one or wait for one to be {@link #release(Object) released}. Callers must + * {@link #release (Object) release} the connection when they are done with it. + * + * @param timeout the maximum amount of time to wait + * @return a resource for exclusive use by the caller + * @throws TimeoutException if the specified timeout was reached before a resource became + * available + * @throws ResourceExhaustedException if no resource could be obtained because this pool was + * exhausted + */ + T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException; + + /** + * Releases a resource obtained from this pool back into the pool of available resources. It is an + * error to release a resource not obtained from this pool. + * + * @param resource Resource to release. + */ + void release(T resource); + + /** + * Removes a resource obtained from this pool from its available resources. It is an error to + * remove a resource not obtained from this pool. + * + * @param resource Resource to remove. + */ + void remove(T resource); + + /** + * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects + * when they are released. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java new file mode 100644 index 0000000..f2c38ec --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java @@ -0,0 +1,30 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +/** + * @author John Sirois + */ +public class ResourceExhaustedException extends Exception { + public ResourceExhaustedException(String message) { + super(message); + } + + public ResourceExhaustedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java new file mode 100644 index 0000000..97af48d --- /dev/null +++ b/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java @@ -0,0 +1,430 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.objectsize; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Sets; + +/** + * Contains utility methods for calculating the memory usage of objects. It + * only works on the HotSpot JVM, and infers the actual memory layout (32 bit + * vs. 64 bit word size, compressed object pointers vs. uncompressed) from + * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM. + * It can only make an educated guess at whether compressed OOPs are used, + * though; specifically, it knows what the JVM's default choice of OOP + * compression would be based on HotSpot version and maximum heap sizes, but if + * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line + * switch, it can not detect + * this fact and will report incorrect sizes, as it will presume the default JVM + * behavior. + * + * @author Attila Szegedi + */ +public class ObjectSizeCalculator { + + /** + * Describes constant memory overheads for various constructs in a JVM implementation. + */ + public interface MemoryLayoutSpecification { + + /** + * Returns the fixed overhead of an array of any type or length in this JVM. + * + * @return the fixed overhead of an array. + */ + int getArrayHeaderSize(); + + /** + * Returns the fixed overhead of for any {@link Object} subclass in this JVM. + * + * @return the fixed overhead of any object. + */ + int getObjectHeaderSize(); + + /** + * Returns the quantum field size for a field owned by an object in this JVM. + * + * @return the quantum field size for an object. + */ + int getObjectPadding(); + + /** + * Returns the fixed size of an object reference in this JVM. + * + * @return the size of all object references. + */ + int getReferenceSize(); + + /** + * Returns the quantum field size for a field owned by one of an object's ancestor superclasses + * in this JVM. + * + * @return the quantum field size for a superclass field. + */ + int getSuperclassFieldPadding(); + } + + private static class CurrentLayout { + private static final MemoryLayoutSpecification SPEC = + getEffectiveMemoryLayoutSpecification(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. Attempts to to detect the current JVM memory layout, + * but may fail with {@link UnsupportedOperationException}; + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + * @throws UnsupportedOperationException if the current vm memory layout cannot be detected. + */ + public static long getObjectSize(Object obj) throws UnsupportedOperationException { + return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj); + } + + // Fixed object header size for arrays. + private final int arrayHeaderSize; + // Fixed object header size for non-array objects. + private final int objectHeaderSize; + // Padding for the object size - if the object size is not an exact multiple + // of this, it is padded to the next multiple. + private final int objectPadding; + // Size of reference (pointer) fields. + private final int referenceSize; + // Padding for the fields of superclass before fields of subclasses are + // added. + private final int superclassFieldPadding; + + private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos = + CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() { + public ClassSizeInfo load(Class<?> clazz) { + return new ClassSizeInfo(clazz); + } + }); + + + private final Set<Object> alreadyVisited = Sets.newIdentityHashSet(); + private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024); + private long size; + + /** + * Creates an object size calculator that can calculate object sizes for a given + * {@code memoryLayoutSpecification}. + * + * @param memoryLayoutSpecification a description of the JVM memory layout. + */ + public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) { + Preconditions.checkNotNull(memoryLayoutSpecification); + arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize(); + objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize(); + objectPadding = memoryLayoutSpecification.getObjectPadding(); + referenceSize = memoryLayoutSpecification.getReferenceSize(); + superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + */ + public synchronized long calculateObjectSize(Object obj) { + // Breadth-first traversal instead of naive depth-first with recursive + // implementation, so we don't blow the stack traversing long linked lists. + try { + for (;;) { + visit(obj); + if (pending.isEmpty()) { + return size; + } + obj = pending.removeFirst(); + } + } finally { + alreadyVisited.clear(); + pending.clear(); + size = 0; + } + } + + private void visit(Object obj) { + if (alreadyVisited.contains(obj)) { + return; + } + final Class<?> clazz = obj.getClass(); + if (clazz == ArrayElementsVisitor.class) { + ((ArrayElementsVisitor) obj).visit(this); + } else { + alreadyVisited.add(obj); + if (clazz.isArray()) { + visitArray(obj); + } else { + classSizeInfos.getUnchecked(clazz).visit(obj, this); + } + } + } + + private void visitArray(Object array) { + final Class<?> componentType = array.getClass().getComponentType(); + final int length = Array.getLength(array); + if (componentType.isPrimitive()) { + increaseByArraySize(length, getPrimitiveFieldSize(componentType)); + } else { + increaseByArraySize(length, referenceSize); + // If we didn't use an ArrayElementsVisitor, we would be enqueueing every + // element of the array here instead. For large arrays, it would + // tremendously enlarge the queue. In essence, we're compressing it into + // a small command object instead. This is different than immediately + // visiting the elements, as their visiting is scheduled for the end of + // the current queue. + switch (length) { + case 0: { + break; + } + case 1: { + enqueue(Array.get(array, 0)); + break; + } + default: { + enqueue(new ArrayElementsVisitor((Object[]) array)); + } + } + } + } + + private void increaseByArraySize(int length, long elementSize) { + increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding)); + } + + private static class ArrayElementsVisitor { + private final Object[] array; + + ArrayElementsVisitor(Object[] array) { + this.array = array; + } + + public void visit(ObjectSizeCalculator calc) { + for (Object elem : array) { + if (elem != null) { + calc.visit(elem); + } + } + } + } + + void enqueue(Object obj) { + if (obj != null) { + pending.addLast(obj); + } + } + + void increaseSize(long objectSize) { + size += objectSize; + } + + @VisibleForTesting + static long roundTo(long x, int multiple) { + return ((x + multiple - 1) / multiple) * multiple; + } + + private class ClassSizeInfo { + // Padded fields + header size + private final long objectSize; + // Only the fields size - used to calculate the subclasses' memory + // footprint. + private final long fieldsSize; + private final Field[] referenceFields; + + public ClassSizeInfo(Class<?> clazz) { + long fieldsSize = 0; + final List<Field> referenceFields = new LinkedList<Field>(); + for (Field f : clazz.getDeclaredFields()) { + if (Modifier.isStatic(f.getModifiers())) { + continue; + } + final Class<?> type = f.getType(); + if (type.isPrimitive()) { + fieldsSize += getPrimitiveFieldSize(type); + } else { + f.setAccessible(true); + referenceFields.add(f); + fieldsSize += referenceSize; + } + } + final Class<?> superClass = clazz.getSuperclass(); + if (superClass != null) { + final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass); + fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding); + referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields)); + } + this.fieldsSize = fieldsSize; + this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding); + this.referenceFields = referenceFields.toArray( + new Field[referenceFields.size()]); + } + + void visit(Object obj, ObjectSizeCalculator calc) { + calc.increaseSize(objectSize); + enqueueReferencedObjects(obj, calc); + } + + public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) { + for (Field f : referenceFields) { + try { + calc.enqueue(f.get(obj)); + } catch (IllegalAccessException e) { + final AssertionError ae = new AssertionError( + "Unexpected denial of access to " + f); + ae.initCause(e); + throw ae; + } + } + } + } + + private static long getPrimitiveFieldSize(Class<?> type) { + if (type == boolean.class || type == byte.class) { + return 1; + } + if (type == char.class || type == short.class) { + return 2; + } + if (type == int.class || type == float.class) { + return 4; + } + if (type == long.class || type == double.class) { + return 8; + } + throw new AssertionError("Encountered unexpected primitive type " + + type.getName()); + } + + @VisibleForTesting + static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() { + final String vmName = System.getProperty("java.vm.name"); + if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") + || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) { + throw new UnsupportedOperationException( + "ObjectSizeCalculator only supported on HotSpot VM"); + } + + final String dataModel = System.getProperty("sun.arch.data.model"); + if ("32".equals(dataModel)) { + // Running with 32-bit data model + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 12; + } + @Override public int getObjectHeaderSize() { + return 8; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 4; + } + @Override public int getSuperclassFieldPadding() { + return 4; + } + }; + } else if (!"64".equals(dataModel)) { + throw new UnsupportedOperationException("Unrecognized value '" + + dataModel + "' of sun.arch.data.model system property"); + } + + final String strVmVersion = System.getProperty("java.vm.version"); + final int vmVersion = Integer.parseInt(strVmVersion.substring(0, + strVmVersion.indexOf('.'))); + if (vmVersion >= 17) { + long maxMemory = 0; + for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { + maxMemory += mp.getUsage().getMax(); + } + if (maxMemory < 30L * 1024 * 1024 * 1024) { + // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total + // for all memory pools (yes, including code cache). + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 16; + } + @Override public int getObjectHeaderSize() { + return 12; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 4; + } + @Override public int getSuperclassFieldPadding() { + return 4; + } + }; + } + } + + // In other cases, it's a 64-bit uncompressed OOPs object model + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 24; + } + @Override public int getObjectHeaderSize() { + return 16; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 8; + } + @Override public int getSuperclassFieldPadding() { + return 8; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Amount.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Amount.java b/commons/src/main/java/com/twitter/common/quantity/Amount.java new file mode 100644 index 0000000..5b7b904 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Amount.java @@ -0,0 +1,211 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +import com.google.common.base.Preconditions; + +import com.twitter.common.collections.Pair; + +/** + * Represents a value in a unit system and facilitates unambiguous communication of amounts. + * Instances are created via static factory {@code of(...)} methods. + * + * @param <T> the type of number the amount value is expressed in + * @param <U> the type of unit that this amount quantifies + * + * @author John Sirois + */ +public abstract class Amount<T extends Number & Comparable<T>, U extends Unit<U>> + implements Comparable<Amount<T, U>> { + + /** + * Thrown when a checked operation on an amount would overflow. + */ + + public static class TypeOverflowException extends RuntimeException { + public TypeOverflowException() { + super(); + } + } + + private final Pair<T, U> amount; + private final T maxValue; + + private Amount(T value, U unit, T maxValue) { + Preconditions.checkNotNull(value); + Preconditions.checkNotNull(unit); + this.maxValue = maxValue; + this.amount = Pair.of(value, unit); + } + + public T getValue() { + return amount.getFirst(); + } + + public U getUnit() { + return amount.getSecond(); + } + + public T as(U unit) { + return asUnit(unit); + } + + /** + * Throws TypeOverflowException if an overflow occurs during scaling. + */ + public T asChecked(U unit) { + T retVal = asUnit(unit); + if (retVal.equals(maxValue)) { + throw new TypeOverflowException(); + } + return retVal; + } + + private T asUnit(Unit<?> unit) { + return sameUnits(unit) ? getValue() : scale(getUnit().multiplier() / unit.multiplier()); + } + + @Override + public int hashCode() { + return amount.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Amount)) { + return false; + } + + Amount<?, ?> other = (Amount<?, ?>) obj; + return amount.equals(other.amount) || isSameAmount(other); + } + + private boolean isSameAmount(Amount<?, ?> other) { + // Equals allows Object - so we have no compile time check that other has the right value type; + // ie: make sure they don't have Integer when we have Long. + Number value = other.getValue(); + if (!getValue().getClass().isInstance(value)) { + return false; + } + + Unit<?> unit = other.getUnit(); + if (!getUnit().getClass().isInstance(unit)) { + return false; + } + + @SuppressWarnings("unchecked") + U otherUnit = (U) other.getUnit(); + return isSameAmount(other, otherUnit); + } + + private boolean isSameAmount(Amount<?, ?> other, U otherUnit) { + // Compare in the more precise unit (the one with the lower multiplier). + if (otherUnit.multiplier() > getUnit().multiplier()) { + return getValue().equals(other.asUnit(getUnit())); + } else { + return as(otherUnit).equals(other.getValue()); + } + } + + @Override + public String toString() { + return amount.toString(); + } + + @Override + public int compareTo(Amount<T, U> other) { + // Compare in the more precise unit (the one with the lower multiplier). + if (other.getUnit().multiplier() > getUnit().multiplier()) { + return getValue().compareTo(other.as(getUnit())); + } else { + return as(other.getUnit()).compareTo(other.getValue()); + } + } + + private boolean sameUnits(Unit<? extends Unit<?>> unit) { + return getUnit().equals(unit); + } + + protected abstract T scale(double multiplier); + + /** + * Creates an amount that uses a {@code double} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Double, U> of(double number, U unit) { + return new Amount<Double, U>(number, unit, Double.MAX_VALUE) { + @Override protected Double scale(double multiplier) { + return getValue() * multiplier; + } + }; + } + + /** + * Creates an amount that uses a {@code float} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Float, U> of(float number, U unit) { + return new Amount<Float, U>(number, unit, Float.MAX_VALUE) { + @Override protected Float scale(double multiplier) { + return (float) (getValue() * multiplier); + } + }; + } + + /** + * Creates an amount that uses a {@code long} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Long, U> of(long number, U unit) { + return new Amount<Long, U>(number, unit, Long.MAX_VALUE) { + @Override protected Long scale(double multiplier) { + return (long) (getValue() * multiplier); + } + }; + } + + /** + * Creates an amount that uses an {@code int} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Integer, U> of(int number, U unit) { + return new Amount<Integer, U>(number, unit, Integer.MAX_VALUE) { + @Override protected Integer scale(double multiplier) { + return (int) (getValue() * multiplier); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Data.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Data.java b/commons/src/main/java/com/twitter/common/quantity/Data.java new file mode 100644 index 0000000..de0e484 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Data.java @@ -0,0 +1,54 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +/** + * Provides a unit to allow conversions and unambiguous passing around of data {@link Amount}s. + * The kilo/mega/giga/... hierarchy is built on base 2 so that the hierarchy increases by a factor + * of 1024 instead of 1000 as typical in metric units. Additionally, units are divided in 2 + * hierarchies one based on bits and the other on bytes. Thus {@link #Kb} represents kilobits; so + * 1 Kb = 1024 bits, and {@link #KB} represents kilobytes so 1 KB = 1024 bytes or 8192 bits. + * + * @author John Sirois + */ +public enum Data implements Unit<Data> { + BITS(1), + Kb(1024, BITS), + Mb(1024, Kb), + Gb(1024, Mb), + BYTES(8, BITS), + KB(1024, BYTES), + MB(1024, KB), + GB(1024, MB), + TB(1024, GB), + PB(1024, TB); + + private final double multiplier; + + private Data(double multiplier) { + this.multiplier = multiplier; + } + + private Data(double multiplier, Data base) { + this(multiplier * base.multiplier); + } + + @Override + public double multiplier() { + return multiplier; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Time.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Time.java b/commons/src/main/java/com/twitter/common/quantity/Time.java new file mode 100644 index 0000000..215b0a7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Time.java @@ -0,0 +1,65 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +import java.util.concurrent.TimeUnit; + +/** + * Provides a unit to allow conversions and unambiguous passing around of time {@link Amount}s. + * + * @author John Sirois + */ +public enum Time implements Unit<Time> { + NANOSECONDS(1, TimeUnit.NANOSECONDS, "ns"), + MICROSECONDS(1000, NANOSECONDS, TimeUnit.MICROSECONDS, "us"), + MILLISECONDS(1000, MICROSECONDS, TimeUnit.MILLISECONDS, "ms"), + SECONDS(1000, MILLISECONDS, TimeUnit.SECONDS, "secs"), + MINUTES(60, SECONDS, TimeUnit.MINUTES, "mins"), + HOURS(60, MINUTES, TimeUnit.HOURS, "hrs"), + DAYS(24, HOURS, TimeUnit.DAYS, "days"); + + private final double multiplier; + private final TimeUnit timeUnit; + private final String display; + + private Time(double multiplier, TimeUnit timeUnit, String display) { + this.multiplier = multiplier; + this.timeUnit = timeUnit; + this.display = display; + } + + private Time(double multiplier, Time base, TimeUnit timeUnit, String display) { + this(multiplier * base.multiplier, timeUnit, display); + } + + @Override + public double multiplier() { + return multiplier; + } + + /** + * Returns the equivalent {@code TimeUnit}. + */ + public TimeUnit getTimeUnit() { + return timeUnit; + } + + @Override + public String toString() { + return display; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Unit.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Unit.java b/commons/src/main/java/com/twitter/common/quantity/Unit.java new file mode 100644 index 0000000..c64067c --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Unit.java @@ -0,0 +1,36 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +/** + * Represents a unit hierarchy for a given unit of measure; eg: time. Instances represent specific + * units from the hierarchy; eg: seconds. + * + * @param <U> the type of the concrete unit implementation + * + * @author John Sirois + */ +public interface Unit<U extends Unit<U>> { + + /** + * Returns the weight of this unit relative to other units in the same hierarchy. Typically the + * smallest unit in the hierarchy returns 1, but this need not be the case. It is only required + * that each unit of the hierarchy return a multiplier relative to a common base unit for the + * hierarchy. + */ + double multiplier(); +}
