http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java b/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java deleted file mode 100644 index fba1e4b..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.monitoring; - -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.concurrent.GuardedBy; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.net.loadbalancing.RequestTracker; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.util.Clock; -import org.apache.aurora.common.util.concurrent.ExecutorServiceShutdown; - -/** - * Monitors activity on established connections between two hosts. This can be used for a server - * to track inbound clients, or for a client to track requests sent to different servers. - * - * The monitor will retain information for hosts that may no longer be active, but will expunge - * information for hosts that have been idle for more than five minutes. - * - * @author William Farner - */ -public class TrafficMonitor<K> implements ConnectionMonitor<K>, RequestTracker<K> { - - @VisibleForTesting - static final Amount<Long, Time> DEFAULT_GC_INTERVAL = Amount.of(5L, Time.MINUTES); - - @GuardedBy("this") - private final LoadingCache<K, TrafficInfo> trafficInfos; - - private final String serviceName; - private final Amount<Long, Time> gcInterval; - - private AtomicLong lifetimeRequests = new AtomicLong(); - private final Clock clock; - private final ScheduledExecutorService gcExecutor; - - /** - * Creates a new traffic monitor using the default cleanup interval. - * - * @param serviceName Name of the service to monitor, used for creating variable names. - */ - public TrafficMonitor(final String serviceName) { - this(serviceName, DEFAULT_GC_INTERVAL); - } - - /** - * Creates a new traffic monitor with a custom cleanup interval. - * - * @param serviceName Service name for the monitor. - * @param gcInterval Interval on which the remote host garbage collector should run. - */ - public TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval) { - this(serviceName, gcInterval, Clock.SYSTEM_CLOCK); - } - - /** - * Convenience method to create a typed traffic monitor. - * - * @param serviceName Service name for the monitor. - * @param <T> Monitor type. - * @return A new traffic monitor. - */ - public static <T> TrafficMonitor<T> create(String serviceName) { - return new TrafficMonitor<T>(serviceName); - } - - @VisibleForTesting - TrafficMonitor(final String serviceName, Clock clock) { - this(serviceName, DEFAULT_GC_INTERVAL, clock); - } - - private TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval, Clock clock) { - this.serviceName = MorePreconditions.checkNotBlank(serviceName); - this.clock = Preconditions.checkNotNull(clock); - Preconditions.checkNotNull(gcInterval); - Preconditions.checkArgument(gcInterval.getValue() > 0, "GC interval must be > zero."); - this.gcInterval = gcInterval; - - trafficInfos = CacheBuilder.newBuilder().build(new CacheLoader<K, TrafficInfo>() { - @Override public TrafficInfo load(K key) { - return new TrafficInfo(key); - } - }); - - Runnable gc = new Runnable() { - @Override public void run() { gc(); } - }; - - gcExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("TrafficMonitor-gc-%d").build()); - gcExecutor.scheduleAtFixedRate(gc, gcInterval.as(Time.SECONDS), gcInterval.as(Time.SECONDS), - TimeUnit.SECONDS); - } - - /** - * Gets the name of the service that this monitor is monitoring. - * - * @return Monitor's service name. - */ - public String getServiceName() { - return serviceName; - } - - /** - * Gets the total number of requests that this monitor has observed, for all remote hosts. - * - * @return Total number of requests observed. - */ - public long getLifetimeRequestCount() { - return lifetimeRequests.get(); - } - - /** - * Fetches all current traffic information. - * - * @return A map from the host key type to information about that host. - */ - public synchronized Map<K, TrafficInfo> getTrafficInfo() { - return ImmutableMap.copyOf(trafficInfos.asMap()); - } - - @Override - public synchronized void connected(K key) { - Preconditions.checkNotNull(key); - - trafficInfos.getUnchecked(key).incConnections(); - } - - @Override - public synchronized void released(K key) { - Preconditions.checkNotNull(key); - - TrafficInfo info = trafficInfos.getUnchecked(key); - - Preconditions.checkState(info.getConnectionCount() > 0, "Double release detected!"); - info.decConnections(); - } - - @Override - public void requestResult(K key, RequestResult result, long requestTimeNanos) { - Preconditions.checkNotNull(key); - - lifetimeRequests.incrementAndGet(); - trafficInfos.getUnchecked(key).addResult(result); - } - - @VisibleForTesting - synchronized void gc() { - Iterables.removeIf(trafficInfos.asMap().entrySet(), - new Predicate<Map.Entry<K, TrafficInfo>>() { - @Override public boolean apply(Map.Entry<K, TrafficInfo> clientInfo) { - if (clientInfo.getValue().connections.get() > 0) return false; - - long idlePeriod = clock.nowNanos() - clientInfo.getValue().getLastActiveTimestamp(); - - return idlePeriod > gcInterval.as(Time.NANOSECONDS); - } - }); - } - - /** - * Shuts down TrafficMonitor by stopping background gc task. - */ - public void shutdown() { - new ExecutorServiceShutdown(gcExecutor, Amount.of(0L, Time.SECONDS)).execute(); - } - - /** - * Information about traffic obsserved to/from a specific host. - */ - public class TrafficInfo { - private final K key; - private AtomicInteger requestSuccesses = new AtomicInteger(); - private AtomicInteger requestFailures = new AtomicInteger(); - private AtomicInteger connections = new AtomicInteger(); - private AtomicLong lastActive = new AtomicLong(); - - TrafficInfo(K key) { - this.key = key; - pulse(); - } - - void pulse() { - lastActive.set(clock.nowNanos()); - } - - public K getKey() { - return key; - } - - void addResult(RequestResult result) { - pulse(); - switch (result) { - case SUCCESS: - requestSuccesses.incrementAndGet(); - break; - case FAILED: - case TIMEOUT: - requestFailures.incrementAndGet(); - break; - } - } - - public int getRequestSuccessCount() { - return requestSuccesses.get(); - } - - public int getRequestFailureCount() { - return requestFailures.get(); - } - - int incConnections() { - pulse(); - return connections.incrementAndGet(); - } - - int decConnections() { - pulse(); - return connections.decrementAndGet(); - } - - public int getConnectionCount() { - return connections.get(); - } - - public long getLastActiveTimestamp() { - return lastActive.get(); - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java deleted file mode 100644 index cdaaeab..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -/** - * A factory for connections that also dictates policy for the size of the connection population. - * - * <p>TODO(John Sirois): separate concerns - mixing in willCreate/null protocol is already tangling - * implementation code - * - * @author John Sirois - */ -public interface ConnectionFactory<S extends Connection<?, ?>> { - - /** - * Checks whether this factory might create a connection if requested. - * - * @return {@code} true if this factory might create a connection at this point in time; ie - * a call to {@link #create} might not have returned {@code null}. May return true to multiple - * threads if concurrently creating connections. - */ - boolean mightCreate(); - - /** - * Attempts to create a new connection within the given timeout and subject to this factory's - * connection population size policy. - * - * @param timeout the maximum amount of time to wait - * @return a new connection or null if there are too many connections already - * @throws Exception if there was a problem creating the connection or establishing the connection - * takes too long - */ - S create(Amount<Long, Time> timeout) throws Exception; - - /** - * Destroys a connection. It is an error to attempt to destroy a connection this factory did - * not {@link #create} - * - * @param connection The connection to destroy. - */ - void destroy(S connection); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java deleted file mode 100644 index 316bf2b..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.aurora.common.base.Supplier; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.stats.StatsProvider; - -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and - * connection choice to a supplied strategy. - * - * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in - * use. - * - * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command - * - * @author John Sirois - */ -public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> { - - private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName()); - - private final Set<S> leasedConnections = - Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap()); - private final Set<S> availableConnections = Sets.newHashSet(); - private final Lock poolLock; - private final Condition available; - - private final ConnectionFactory<S> connectionFactory; - private final Executor executor; - - private volatile boolean closed; - private final AtomicLong connectionsCreated; - private final AtomicLong connectionsDestroyed; - private final AtomicLong connectionsReturned; - - /** - * Creates a connection pool with a connection picker that selects the first item in the set of - * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}. - * - * @param connectionFactory Factory to create and destroy connections. - */ - public ConnectionPool(ConnectionFactory<S> connectionFactory) { - this(connectionFactory, Stats.STATS_PROVIDER); - } - - /** - * Creates a connection pool with a connection picker that selects the first item in the set of - * available connections and uses the supplied StatsProvider to register stats with. - * - * @param connectionFactory Factory to create and destroy connections. - * @param statsProvider Stats export provider. - */ - public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) { - this(Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("CP-" + connectionFactory + "[%d]") - .setDaemon(true) - .build()), - new ReentrantLock(true), connectionFactory, statsProvider); - } - - @VisibleForTesting - ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory, - StatsProvider statsProvider) { - Preconditions.checkNotNull(executor); - Preconditions.checkNotNull(poolLock); - Preconditions.checkNotNull(connectionFactory); - Preconditions.checkNotNull(statsProvider); - - this.executor = executor; - this.poolLock = poolLock; - available = poolLock.newCondition(); - this.connectionFactory = connectionFactory; - - String cfName = Stats.normalizeName(connectionFactory.toString()); - statsProvider.makeGauge("cp_leased_connections_" + cfName, - new Supplier<Integer>() { - @Override public Integer get() { - return leasedConnections.size(); - } - }); - statsProvider.makeGauge("cp_available_connections_" + cfName, - new Supplier<Integer>() { - @Override public Integer get() { - return availableConnections.size(); - } - }); - this.connectionsCreated = - statsProvider.makeCounter("cp_created_connections_" + cfName); - this.connectionsDestroyed = - statsProvider.makeCounter("cp_destroyed_connections_" + cfName); - this.connectionsReturned = - statsProvider.makeCounter("cp_returned_connections_" + cfName); - } - - @Override - public String toString() { - return "CP-" + connectionFactory; - } - - @Override - public S get() throws ResourceExhaustedException, TimeoutException { - checkNotClosed(); - poolLock.lock(); - try { - return leaseConnection(NO_TIMEOUT); - } finally { - poolLock.unlock(); - } - } - - @Override - public S get(Amount<Long, Time> timeout) - throws ResourceExhaustedException, TimeoutException { - - checkNotClosed(); - Preconditions.checkNotNull(timeout); - if (timeout.getValue() == 0) { - return get(); - } - - try { - long start = System.nanoTime(); - long timeBudgetNs = timeout.as(Time.NANOSECONDS); - if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) { - try { - timeBudgetNs -= (System.nanoTime() - start); - return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS)); - } finally { - poolLock.unlock(); - } - } else { - throw new TimeoutException("Timed out waiting for pool lock"); - } - } catch (InterruptedException e) { - throw new TimeoutException("Interrupted waiting for pool lock"); - } - } - - private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException, - TimeoutException { - S connection = getConnection(timeout); - if (connection == null) { - throw new ResourceExhaustedException("Connection pool resources exhausted"); - } - return leaseConnection(connection); - } - - @Override - public void release(S connection) { - release(connection, false); - } - - /** - * Equivalent to releasing a Connection with isValid() == false. - * @see ObjectPool#remove(Object) - */ - @Override - public void remove(S connection) { - release(connection, true); - } - - // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create - // connection - reason about this and potentially submit release to our executor - private void release(S connection, boolean remove) { - poolLock.lock(); - try { - if (!leasedConnections.remove(connection)) { - throw new IllegalArgumentException("Connection not controlled by this connection pool: " - + connection); - } - - if (!closed && !remove && connection.isValid()) { - addConnection(connection); - connectionsReturned.incrementAndGet(); - } else { - connectionFactory.destroy(connection); - connectionsDestroyed.incrementAndGet(); - } - } finally { - poolLock.unlock(); - } - } - - @Override - public void close() { - poolLock.lock(); - try { - for (S availableConnection : availableConnections) { - connectionFactory.destroy(availableConnection); - } - } finally { - closed = true; - poolLock.unlock(); - } - } - - private void checkNotClosed() { - Preconditions.checkState(!closed); - } - - private S leaseConnection(S connection) { - leasedConnections.add(connection); - return connection; - } - - // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be - // fixed but there may be no need - do gedankanalysis - private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException, - TimeoutException { - if (availableConnections.isEmpty()) { - if (leasedConnections.isEmpty()) { - // Completely empty pool - try { - return createConnection(timeout); - } catch (Exception e) { - throw new ResourceExhaustedException("failed to create a new connection", e); - } - } else { - // If the pool is allowed to grow - let the connection factory race a release - if (connectionFactory.mightCreate()) { - executor.execute(new Runnable() { - @Override public void run() { - try { - // The connection timeout is not needed here to honor the callers get requested - // timeout, but we don't want to have an infinite timeout which could exhaust a - // thread pool over many backgrounded create calls - S connection = createConnection(timeout); - if (connection != null) { - addConnection(connection); - } else { - LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " + - "due to maximum pool size or timeout"); - } - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e); - } - } - }); - } - - try { - // We wait for a returned/new connection here in loops to guard against the - // "spurious wakeups" that are documented can occur with Condition.await() - if (timeout.getValue() == 0) { - while(availableConnections.isEmpty()) { - available.await(); - } - } else { - long timeRemainingNs = timeout.as(Time.NANOSECONDS); - while(availableConnections.isEmpty()) { - long start = System.nanoTime(); - if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) { - throw new TimeoutException( - "timeout waiting for a connection to be released to the pool"); - } else { - timeRemainingNs -= (System.nanoTime() - start); - } - } - if (availableConnections.isEmpty()) { - throw new TimeoutException( - "timeout waiting for a connection to be released to the pool"); - } - } - } catch (InterruptedException e) { - throw new TimeoutException("Interrupted while waiting for a connection."); - } - } - } - - return getAvailableConnection(); - } - - private S getAvailableConnection() { - S connection = (availableConnections.size() == 1) - ? Iterables.getOnlyElement(availableConnections) - : availableConnections.iterator().next(); - if (!availableConnections.remove(connection)) { - throw new IllegalArgumentException("Connection picked not in pool: " + connection); - } - return connection; - } - - private S createConnection(Amount<Long, Time> timeout) throws Exception { - S connection = connectionFactory.create(timeout); - if (connection != null) { - connectionsCreated.incrementAndGet(); - } - return connection; - } - - private void addConnection(S connection) { - poolLock.lock(); - try { - availableConnections.add(connection); - available.signal(); - } finally { - poolLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java deleted file mode 100644 index 4f75893..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.Command; - -/** - * Utility methods for dealing with dynamic sets of hosts. - */ -public final class DynamicHostSetUtil { - - /** - * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of - * the underlying actual endpoints. - * - * @param hostSet The hostSet to snapshot. - * @throws DynamicHostSet.MonitorException if there was a problem obtaining the snapshot. - */ - public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws DynamicHostSet.MonitorException { - final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder(); - Command unwatch = hostSet.watch(new DynamicHostSet.HostChangeMonitor<T>() { - @Override public void onChange(ImmutableSet<T> hostSet) { - snapshot.addAll(hostSet); - } - }); - unwatch.execute(); - return snapshot.build(); - } - - private DynamicHostSetUtil() { - // utility - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java deleted file mode 100644 index 2fd6046..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.net.loadbalancing.LoadBalancer; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.ServerSet; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeoutException; - -/** - * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a - * {@link ServerSet}. - * - * @param <H> The type that contains metadata information about hosts, such as liveness and address. - * @param <T> The raw connection type that is being pooled. - * @param <E> The type that identifies the endpoint of the pool, such as an address. - * @author John Sirois - */ -public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> { - - private final MetaPool<T, E> pool; - - /** - * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools - * for the given {@code serverSet}. - * - * @param hostSet the dynamic set of available servers to pool connections for - * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint - * @param loadBalancer Load balancer to manage request flow. - * @param onBackendsChosen A callback to notify of chosen backends. - * @param restoreInterval the interval after connection errors start occurring for a target to - * begin checking to see if it has come back to a healthy state - * @param endpointExtractor Function that transforms a service instance into an endpoint instance. - * @param livenessChecker Filter that will determine whether a host indicates itself as available. - * @throws DynamicHostSet.MonitorException if there is a problem monitoring the host set - */ - public DynamicPool(DynamicHostSet<H> hostSet, - Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory, - LoadBalancer<E> loadBalancer, - Closure<Collection<E>> onBackendsChosen, - Amount<Long, Time> restoreInterval, - Function<H, E> endpointExtractor, - Predicate<H> livenessChecker) - throws DynamicHostSet.MonitorException { - Preconditions.checkNotNull(hostSet); - Preconditions.checkNotNull(endpointPoolFactory); - - pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval); - - // TODO(John Sirois): consider an explicit start/stop - hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor, - livenessChecker) { - @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools, - Map<E, ObjectPool<Connection<T, E>>> livePools) { - poolRebuilt(deadPools, livePools); - } - }); - } - - @VisibleForTesting - void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools, - Map<E, ObjectPool<Connection<T, E>>> livePools) { - - pool.setBackends(livePools); - - for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) { - deadTargetPool.close(); - } - } - - @Override - public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException { - return pool.get(); - } - - @Override - public Connection<T, E> get(Amount<Long, Time> timeout) - throws ResourceExhaustedException, TimeoutException { - return pool.get(timeout); - } - - @Override - public void release(Connection<T, E> connection) { - pool.release(connection); - } - - @Override - public void remove(Connection<T, E> connection) { - pool.remove(connection); - } - - @Override - public void close() { - pool.close(); - } - - private abstract class PoolMonitor<H, S extends Connection<?, ?>> - implements DynamicHostSet.HostChangeMonitor<H> { - - private final Function<E, ObjectPool<S>> endpointPoolFactory; - private final Function<H, E> endpointExtractor; - private final Predicate<H> livenessTest; - - public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory, - Function<H, E> endpointExtractor, - Predicate<H> livenessTest) { - this.endpointPoolFactory = endpointPoolFactory; - this.endpointExtractor = endpointExtractor; - this.livenessTest = livenessTest; - } - - private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap(); - - @Override - public synchronized void onChange(ImmutableSet<H> serverSet) { - // TODO(John Sirois): change onChange to pass the delta data since its already computed by - // ServerSet - - Map<E, H> newEndpoints = - Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor); - - Set<E> deadEndpoints = ImmutableSet.copyOf( - Sets.difference(endpointPools.keySet(), newEndpoints.keySet())); - Set<ObjectPool<S>> deadPools = Sets.newHashSet(); - for (E endpoint : deadEndpoints) { - ObjectPool<S> deadPool = endpointPools.remove(endpoint); - deadPools.add(deadPool); - } - - Set<E> addedEndpoints = ImmutableSet.copyOf( - Sets.difference(newEndpoints.keySet(), endpointPools.keySet())); - for (E endpoint : addedEndpoints) { - ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint); - endpointPools.put(endpoint, endpointPool); - } - - onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools)); - } - - protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools, - Map<E, ObjectPool<S>> livePools); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java deleted file mode 100644 index df1bd96..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.loadbalancing.LoadBalancer; -import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -/** - * A connection pool that picks connections from a set of backend pools. Backend pools are selected - * from randomly initially but then as they are used they are ranked according to how many - * connections they have available and whether or not the last used connection had an error or not. - * In this way, backends that are responsive should get selected in preference to those that are - * not. - * - * <p>Non-responsive backends are monitored after a configurable period in a background thread and - * if a connection can be obtained they start to float back up in the rankings. In this way, - * backends that are initially non-responsive but later become responsive should end up getting - * selected. - * - * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command - * - * @author John Sirois - */ -public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> { - - private final Command stopBackendRestorer; - - private Map<E, ObjectPool<Connection<T, E>>> backends = null; - - // Locks to guard mutation of the backends set. - private final Lock backendsReadLock; - private final Lock backendsWriteLock; - - private final Closure<Collection<E>> onBackendsChosen; - - private final LoadBalancer<E> loadBalancer; - - /** - * Creates a connection pool with no backends. Backends may be added post-creation by calling - * {@link #setBackends(java.util.Map)} - * - * @param loadBalancer the load balancer to distribute requests among backends. - * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new - * set of backends to restrict its call distribution to - * @param restoreInterval the interval after a backend goes dead to begin checking the backend to - * see if it has come back to a healthy state - */ - public MetaPool(LoadBalancer<E> loadBalancer, - Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) { - this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer, - onBackendsChosen, restoreInterval); - } - - /** - * Creates a connection pool that balances connections across multiple backend pools. - * - * @param backends the connection pools for the backends - * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new - * set of backends to restrict its call distribution to - * @param loadBalancer the load balancer to distribute requests among backends. - * @param restoreInterval the interval after a backend goes dead to begin checking the backend to - * see if it has come back to a healthy state - */ - public MetaPool( - ImmutableMap<E, ObjectPool<Connection<T, E>>> backends, - LoadBalancer<E> loadBalancer, - Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) { - - this.loadBalancer = Preconditions.checkNotNull(loadBalancer); - this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen); - - ReadWriteLock backendsLock = new ReentrantReadWriteLock(true); - backendsReadLock = backendsLock.readLock(); - backendsWriteLock = backendsLock.writeLock(); - - setBackends(backends); - - Preconditions.checkNotNull(restoreInterval); - Preconditions.checkArgument(restoreInterval.getValue() > 0); - stopBackendRestorer = startDeadBackendRestorer(restoreInterval); - } - - /** - * Assigns the backend pools that this pool should draw from. - * - * @param pools New pools to use. - */ - public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) { - backendsWriteLock.lock(); - try { - backends = Preconditions.checkNotNull(pools); - loadBalancer.offerBackends(pools.keySet(), onBackendsChosen); - } finally { - backendsWriteLock.unlock(); - } - } - - private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) { - - final AtomicBoolean shouldRestore = new AtomicBoolean(true); - Runnable restoreDeadBackends = new Runnable() { - @Override public void run() { - if (shouldRestore.get()) { - restoreDeadBackends(restoreInterval); - } - } - }; - final ScheduledExecutorService scheduledExecutorService = - Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("MTCP-DeadBackendRestorer[%s]") - .build()); - long restoreDelay = restoreInterval.getValue(); - scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay, - restoreDelay, restoreInterval.getUnit().getTimeUnit()); - - return new Command() { - @Override public void execute() { - shouldRestore.set(false); - scheduledExecutorService.shutdownNow(); - LOG.info("Backend restorer shut down"); - } - }; - } - - private static final Logger LOG = Logger.getLogger(MetaPool.class.getName()); - - private void restoreDeadBackends(Amount<Long, Time> restoreInterval) { - for (E backend : snapshotBackends()) { - ObjectPool<Connection<T, E>> pool; - backendsReadLock.lock(); - try { - pool = backends.get(backend); - } finally { - backendsReadLock.unlock(); - } - - // We can lose a race if the backends change - and that's fine, we'll restore the new set of - // backends in the next scheduled restoration run. - if (pool != null) { - try { - release(get(backend, pool, restoreInterval)); - } catch (TimeoutException e) { - LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); - } catch (ResourceExhaustedException e) { - LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); - } - } - } - } - - private Iterable<E> snapshotBackends() { - backendsReadLock.lock(); - try { - return ImmutableList.copyOf(backends.keySet()); - } finally { - backendsReadLock.unlock(); - } - } - - @Override - public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException { - return get(ObjectPool.NO_TIMEOUT); - } - - @Override - public Connection<T, E> get(Amount<Long, Time> timeout) - throws ResourceExhaustedException, TimeoutException { - - E backend; - ObjectPool<Connection<T, E>> pool; - - backendsReadLock.lock(); - try { - backend = loadBalancer.nextBackend(); - Preconditions.checkNotNull(backend, "Load balancer gave a null backend."); - - pool = backends.get(backend); - Preconditions.checkNotNull(backend, - "Given backend %s not found in tracked backends: %s", backend, backends); - } finally { - backendsReadLock.unlock(); - } - - return get(backend, pool, timeout); - } - - private static class ManagedConnection<T, E> implements Connection<T, E> { - private final Connection<T, E> connection; - private final ObjectPool<Connection<T, E>> pool; - - private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) { - this.connection = connection; - this.pool = pool; - } - - @Override - public void close() { - connection.close(); - } - - @Override - public T get() { - return connection.get(); - } - - @Override - public boolean isValid() { - return connection.isValid(); - } - - @Override - public E getEndpoint() { - return connection.getEndpoint(); - } - - @Override public String toString() { - return "ManagedConnection[" + connection.toString() + "]"; - } - - void release(boolean remove) { - if (remove) { - pool.remove(connection); - } else { - pool.release(connection); - } - } - } - - private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool, - Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException { - - long startNanos = System.nanoTime(); - try { - Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout); - - // BEWARE: We have leased a connection from the underlying pool here and must return it to the - // caller so they can later release it. If we fail to do so, the connection will leak. - // Catching intermediate exceptions ourselves and pro-actively returning the connection to the - // pool before re-throwing is not a viable option since the return would have to succeed, - // forcing us to ignore the timeout passed in. - - // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire - // a (backend, pool) pair atomically that has since been removed, we can safely let the lb - // know about backend events and it will just ignore us. - - try { - loadBalancer.connected(backend, System.nanoTime() - startNanos); - } catch (RuntimeException e) { - LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after " - + "leasing a connection - continuing", e); - } - return new ManagedConnection<T, E>(connection, pool); - } catch (TimeoutException e) { - loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT); - throw e; - } catch (ResourceExhaustedException e) { - loadBalancer.connectFailed(backend, ConnectionResult.FAILED); - throw e; - } - } - - @Override - public void release(Connection<T, E> connection) { - release(connection, false); - } - - /** - * Equivalent to releasing a Connection with isValid() == false. - * @see ObjectPool#remove(Object) - */ - @Override - public void remove(Connection<T, E> connection) { - release(connection, true); - } - - private void release(Connection<T, E> connection, boolean remove) { - backendsWriteLock.lock(); - try { - - if (!(connection instanceof ManagedConnection)) { - throw new IllegalArgumentException("Connection not controlled by this connection pool: " - + connection); - } - ((ManagedConnection) connection).release(remove); - - loadBalancer.released(connection.getEndpoint()); - } finally { - backendsWriteLock.unlock(); - } - } - - @Override - public void close() { - stopBackendRestorer.execute(); - - backendsWriteLock.lock(); - try { - for (ObjectPool<Connection<T, E>> backend : backends.values()) { - backend.close(); - } - } finally { - backendsWriteLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java deleted file mode 100644 index a665903..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -import java.util.concurrent.TimeoutException; - -/** - * A generic object pool that provides object of a given type for exclusive use by the caller. - * Object pools generally pool expensive resources and so offer a {@link #close} method that should - * be used to free these resources when the pool is no longer needed. - * - * @author John Sirois - */ -public interface ObjectPool<T> { - - /** - * Gets a resource potentially blocking for as long as it takes to either create a new one or wait - * for one to be {@link #release(Object) released}. Callers must {@link #release(Object) release} - * the connection when they are done with it. - * - * @return a resource for exclusive use by the caller - * @throws ResourceExhaustedException if no resource could be obtained because this pool was - * exhausted - * @throws TimeoutException if we timed out while trying to fetch a resource - */ - T get() throws ResourceExhaustedException, TimeoutException; - - /** - * A convenience constant representing a no timeout. - */ - Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS); - - /** - * Gets a resource; timing out if there are none available and it takes longer than specified to - * create a new one or wait for one to be {@link #release(Object) released}. Callers must - * {@link #release (Object) release} the connection when they are done with it. - * - * @param timeout the maximum amount of time to wait - * @return a resource for exclusive use by the caller - * @throws TimeoutException if the specified timeout was reached before a resource became - * available - * @throws ResourceExhaustedException if no resource could be obtained because this pool was - * exhausted - */ - T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException; - - /** - * Releases a resource obtained from this pool back into the pool of available resources. It is an - * error to release a resource not obtained from this pool. - * - * @param resource Resource to release. - */ - void release(T resource); - - /** - * Removes a resource obtained from this pool from its available resources. It is an error to - * remove a resource not obtained from this pool. - * - * @param resource Resource to remove. - */ - void remove(T resource); - - /** - * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects - * when they are released. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java deleted file mode 100644 index fd48ddb..0000000 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.net.pool; - -/** - * @author John Sirois - */ -public class ResourceExhaustedException extends Exception { - public ResourceExhaustedException(String message) { - super(message); - } - - public ResourceExhaustedException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java deleted file mode 100644 index 95c8868..0000000 --- a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java +++ /dev/null @@ -1,427 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.objectsize; - -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryPoolMXBean; -import java.lang.reflect.Array; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Deque; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Sets; - -/** - * Contains utility methods for calculating the memory usage of objects. It - * only works on the HotSpot JVM, and infers the actual memory layout (32 bit - * vs. 64 bit word size, compressed object pointers vs. uncompressed) from - * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM. - * It can only make an educated guess at whether compressed OOPs are used, - * though; specifically, it knows what the JVM's default choice of OOP - * compression would be based on HotSpot version and maximum heap sizes, but if - * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line - * switch, it can not detect - * this fact and will report incorrect sizes, as it will presume the default JVM - * behavior. - * - * @author Attila Szegedi - */ -public class ObjectSizeCalculator { - - /** - * Describes constant memory overheads for various constructs in a JVM implementation. - */ - public interface MemoryLayoutSpecification { - - /** - * Returns the fixed overhead of an array of any type or length in this JVM. - * - * @return the fixed overhead of an array. - */ - int getArrayHeaderSize(); - - /** - * Returns the fixed overhead of for any {@link Object} subclass in this JVM. - * - * @return the fixed overhead of any object. - */ - int getObjectHeaderSize(); - - /** - * Returns the quantum field size for a field owned by an object in this JVM. - * - * @return the quantum field size for an object. - */ - int getObjectPadding(); - - /** - * Returns the fixed size of an object reference in this JVM. - * - * @return the size of all object references. - */ - int getReferenceSize(); - - /** - * Returns the quantum field size for a field owned by one of an object's ancestor superclasses - * in this JVM. - * - * @return the quantum field size for a superclass field. - */ - int getSuperclassFieldPadding(); - } - - private static class CurrentLayout { - private static final MemoryLayoutSpecification SPEC = - getEffectiveMemoryLayoutSpecification(); - } - - /** - * Given an object, returns the total allocated size, in bytes, of the object - * and all other objects reachable from it. Attempts to to detect the current JVM memory layout, - * but may fail with {@link UnsupportedOperationException}; - * - * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do - * anything special, it measures the size of all objects - * reachable through it (which will include its class loader, and by - * extension, all other Class objects loaded by - * the same loader, and all the parent class loaders). It doesn't provide the - * size of the static fields in the JVM class that the Class object - * represents. - * @return the total allocated size of the object and all other objects it - * retains. - * @throws UnsupportedOperationException if the current vm memory layout cannot be detected. - */ - public static long getObjectSize(Object obj) throws UnsupportedOperationException { - return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj); - } - - // Fixed object header size for arrays. - private final int arrayHeaderSize; - // Fixed object header size for non-array objects. - private final int objectHeaderSize; - // Padding for the object size - if the object size is not an exact multiple - // of this, it is padded to the next multiple. - private final int objectPadding; - // Size of reference (pointer) fields. - private final int referenceSize; - // Padding for the fields of superclass before fields of subclasses are - // added. - private final int superclassFieldPadding; - - private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos = - CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() { - public ClassSizeInfo load(Class<?> clazz) { - return new ClassSizeInfo(clazz); - } - }); - - - private final Set<Object> alreadyVisited = Sets.newIdentityHashSet(); - private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024); - private long size; - - /** - * Creates an object size calculator that can calculate object sizes for a given - * {@code memoryLayoutSpecification}. - * - * @param memoryLayoutSpecification a description of the JVM memory layout. - */ - public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) { - Preconditions.checkNotNull(memoryLayoutSpecification); - arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize(); - objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize(); - objectPadding = memoryLayoutSpecification.getObjectPadding(); - referenceSize = memoryLayoutSpecification.getReferenceSize(); - superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding(); - } - - /** - * Given an object, returns the total allocated size, in bytes, of the object - * and all other objects reachable from it. - * - * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do - * anything special, it measures the size of all objects - * reachable through it (which will include its class loader, and by - * extension, all other Class objects loaded by - * the same loader, and all the parent class loaders). It doesn't provide the - * size of the static fields in the JVM class that the Class object - * represents. - * @return the total allocated size of the object and all other objects it - * retains. - */ - public synchronized long calculateObjectSize(Object obj) { - // Breadth-first traversal instead of naive depth-first with recursive - // implementation, so we don't blow the stack traversing long linked lists. - try { - for (;;) { - visit(obj); - if (pending.isEmpty()) { - return size; - } - obj = pending.removeFirst(); - } - } finally { - alreadyVisited.clear(); - pending.clear(); - size = 0; - } - } - - private void visit(Object obj) { - if (alreadyVisited.contains(obj)) { - return; - } - final Class<?> clazz = obj.getClass(); - if (clazz == ArrayElementsVisitor.class) { - ((ArrayElementsVisitor) obj).visit(this); - } else { - alreadyVisited.add(obj); - if (clazz.isArray()) { - visitArray(obj); - } else { - classSizeInfos.getUnchecked(clazz).visit(obj, this); - } - } - } - - private void visitArray(Object array) { - final Class<?> componentType = array.getClass().getComponentType(); - final int length = Array.getLength(array); - if (componentType.isPrimitive()) { - increaseByArraySize(length, getPrimitiveFieldSize(componentType)); - } else { - increaseByArraySize(length, referenceSize); - // If we didn't use an ArrayElementsVisitor, we would be enqueueing every - // element of the array here instead. For large arrays, it would - // tremendously enlarge the queue. In essence, we're compressing it into - // a small command object instead. This is different than immediately - // visiting the elements, as their visiting is scheduled for the end of - // the current queue. - switch (length) { - case 0: { - break; - } - case 1: { - enqueue(Array.get(array, 0)); - break; - } - default: { - enqueue(new ArrayElementsVisitor((Object[]) array)); - } - } - } - } - - private void increaseByArraySize(int length, long elementSize) { - increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding)); - } - - private static class ArrayElementsVisitor { - private final Object[] array; - - ArrayElementsVisitor(Object[] array) { - this.array = array; - } - - public void visit(ObjectSizeCalculator calc) { - for (Object elem : array) { - if (elem != null) { - calc.visit(elem); - } - } - } - } - - void enqueue(Object obj) { - if (obj != null) { - pending.addLast(obj); - } - } - - void increaseSize(long objectSize) { - size += objectSize; - } - - @VisibleForTesting - static long roundTo(long x, int multiple) { - return ((x + multiple - 1) / multiple) * multiple; - } - - private class ClassSizeInfo { - // Padded fields + header size - private final long objectSize; - // Only the fields size - used to calculate the subclasses' memory - // footprint. - private final long fieldsSize; - private final Field[] referenceFields; - - public ClassSizeInfo(Class<?> clazz) { - long fieldsSize = 0; - final List<Field> referenceFields = new LinkedList<Field>(); - for (Field f : clazz.getDeclaredFields()) { - if (Modifier.isStatic(f.getModifiers())) { - continue; - } - final Class<?> type = f.getType(); - if (type.isPrimitive()) { - fieldsSize += getPrimitiveFieldSize(type); - } else { - f.setAccessible(true); - referenceFields.add(f); - fieldsSize += referenceSize; - } - } - final Class<?> superClass = clazz.getSuperclass(); - if (superClass != null) { - final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass); - fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding); - referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields)); - } - this.fieldsSize = fieldsSize; - this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding); - this.referenceFields = referenceFields.toArray( - new Field[referenceFields.size()]); - } - - void visit(Object obj, ObjectSizeCalculator calc) { - calc.increaseSize(objectSize); - enqueueReferencedObjects(obj, calc); - } - - public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) { - for (Field f : referenceFields) { - try { - calc.enqueue(f.get(obj)); - } catch (IllegalAccessException e) { - final AssertionError ae = new AssertionError( - "Unexpected denial of access to " + f); - ae.initCause(e); - throw ae; - } - } - } - } - - private static long getPrimitiveFieldSize(Class<?> type) { - if (type == boolean.class || type == byte.class) { - return 1; - } - if (type == char.class || type == short.class) { - return 2; - } - if (type == int.class || type == float.class) { - return 4; - } - if (type == long.class || type == double.class) { - return 8; - } - throw new AssertionError("Encountered unexpected primitive type " + - type.getName()); - } - - @VisibleForTesting - static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() { - final String vmName = System.getProperty("java.vm.name"); - if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") - || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) { - throw new UnsupportedOperationException( - "ObjectSizeCalculator only supported on HotSpot VM"); - } - - final String dataModel = System.getProperty("sun.arch.data.model"); - if ("32".equals(dataModel)) { - // Running with 32-bit data model - return new MemoryLayoutSpecification() { - @Override public int getArrayHeaderSize() { - return 12; - } - @Override public int getObjectHeaderSize() { - return 8; - } - @Override public int getObjectPadding() { - return 8; - } - @Override public int getReferenceSize() { - return 4; - } - @Override public int getSuperclassFieldPadding() { - return 4; - } - }; - } else if (!"64".equals(dataModel)) { - throw new UnsupportedOperationException("Unrecognized value '" + - dataModel + "' of sun.arch.data.model system property"); - } - - final String strVmVersion = System.getProperty("java.vm.version"); - final int vmVersion = Integer.parseInt(strVmVersion.substring(0, - strVmVersion.indexOf('.'))); - if (vmVersion >= 17) { - long maxMemory = 0; - for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { - maxMemory += mp.getUsage().getMax(); - } - if (maxMemory < 30L * 1024 * 1024 * 1024) { - // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total - // for all memory pools (yes, including code cache). - return new MemoryLayoutSpecification() { - @Override public int getArrayHeaderSize() { - return 16; - } - @Override public int getObjectHeaderSize() { - return 12; - } - @Override public int getObjectPadding() { - return 8; - } - @Override public int getReferenceSize() { - return 4; - } - @Override public int getSuperclassFieldPadding() { - return 4; - } - }; - } - } - - // In other cases, it's a 64-bit uncompressed OOPs object model - return new MemoryLayoutSpecification() { - @Override public int getArrayHeaderSize() { - return 24; - } - @Override public int getObjectHeaderSize() { - return 16; - } - @Override public int getObjectPadding() { - return 8; - } - @Override public int getReferenceSize() { - return 8; - } - @Override public int getSuperclassFieldPadding() { - return 8; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java deleted file mode 100644 index cfbf04e..0000000 --- a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java +++ /dev/null @@ -1,563 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.stats; - -import java.util.Arrays; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Data; - -/** - * <p> - * Implements Histogram structure for computing approximate quantiles. - * The implementation is based on the following paper: - * - * <pre> - * [MP80] Munro & Paterson, "Selection and Sorting with Limited Storage", - * Theoretical Computer Science, Vol 12, p 315-323, 1980. - * </pre> - * </p> - * <p> - * You could read a detailed description of the same algorithm here: - * - * <pre> - * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other - * Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM - * SIGMOD, Vol 27, No 2, p 426-435, June 1998. - * </pre> - * </p> - * <p> - * There's a good explanation of the algorithm in the Sawzall source code - * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc - * </p> - * Here's a schema of the tree: - * <pre> - * [4] level 3, weight=rootWeight=8 - * | - * [3] level 2, weight=4 - * | - * [2] level 1, weight=2 - * / \ - * [0] [1] level 0, weight=1 - * </pre> - * <p> - * {@code [i]} represents {@code buffer[i]} - * The depth of the tree is limited to a maximum value - * Every buffer has the same size - * </p> - * <p> - * We add element in {@code [0]} or {@code [1]}. - * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer - * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise - * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and - * so on... - * </p> - */ -public final class ApproximateHistogram implements Histogram { - @VisibleForTesting - public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000); - @VisibleForTesting - public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB); - @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long - - // See above - @VisibleForTesting long[][] buffer; - @VisibleForTesting long count = 0L; - @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves - @VisibleForTesting int currentTop = 1; - @VisibleForTesting int[] indices; // member for optimization reason - private boolean leavesSorted = true; - private int rootWeight = 1; - private long[][] bufferPool; // pool of 2 buffers (used for merging) - private int bufferSize; - private int maxDepth; - - /** - * Private init method that is called only by constructors. - * All allocations are done in this method. - * - * @param bufSize size of each buffer - * @param depth maximum depth of the tree of buffers - */ - @VisibleForTesting - void init(int bufSize, int depth) { - bufferSize = bufSize; - maxDepth = depth; - bufferPool = new long[2][bufferSize]; - indices = new int[depth + 1]; - buffer = new long[depth + 1][bufferSize]; - // only allocate the first 2 buffers, lazily allocate the others. - allocate(0); - allocate(1); - Arrays.fill(buffer, 2, buffer.length, null); - clear(); - } - - @VisibleForTesting - ApproximateHistogram(int bufSize, int depth) { - init(bufSize, depth); - } - - /** - * Constructor with precision constraint, it will allocated as much memory as require to match - * this precision constraint. - * @param precision the requested precision - */ - public ApproximateHistogram(Precision precision) { - Preconditions.checkNotNull(precision); - int depth = computeDepth(precision.getEpsilon(), precision.getN()); - int bufSize = computeBufferSize(depth, precision.getN()); - init(bufSize, depth); - } - - /** - * Constructor with memory constraint, it will find the best possible precision that satisfied - * the memory constraint. - * @param maxMemory the maximum amount of memory that the instance will take - */ - public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) { - Preconditions.checkNotNull(maxMemory); - Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES), - "at least 1KB is required for an Histogram"); - - double epsilon = DEFAULT_PRECISION.getEpsilon(); - int n = expectedSize; - int depth = computeDepth(epsilon, n); - int bufSize = computeBufferSize(depth, n); - long maxBytes = maxMemory.as(Data.BYTES); - - // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps) - boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes; - double multiplier = tooMuchMem ? 1.05 : 0.95; - while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) { - epsilon *= multiplier; - if (epsilon < 0.00001) { - // for very high memory constraint increase N as well - n *= 10; - epsilon = DEFAULT_PRECISION.getEpsilon(); - } - depth = computeDepth(epsilon, n); - bufSize = computeBufferSize(depth, n); - } - if (!tooMuchMem) { - // It's ok to consume less memory than the constraint - // but we never have to consume more! - depth = computeDepth(epsilon / multiplier, n); - bufSize = computeBufferSize(depth, n); - } - - init(bufSize, depth); - } - - /** - * Constructor with memory constraint. - * @see #ApproximateHistogram(Amount, int) - */ - public ApproximateHistogram(Amount<Long, Data> maxMemory) { - this(maxMemory, DEFAULT_PRECISION.getN()); - } - - /** - * Default Constructor. - * @see #ApproximateHistogram(Amount) - */ - public ApproximateHistogram() { - this(DEFAULT_MAX_MEMORY); - } - - @Override - public synchronized void add(long x) { - // if the leaves of the tree are full, "collapse" recursively the tree - if (leafCount == 2 * bufferSize) { - Arrays.sort(buffer[0]); - Arrays.sort(buffer[1]); - recCollapse(buffer[0], 1); - leafCount = 0; - } - - // Now we're sure there is space for adding x - if (leafCount < bufferSize) { - buffer[0][leafCount] = x; - } else { - buffer[1][leafCount - bufferSize] = x; - } - leafCount++; - count++; - leavesSorted = (leafCount == 1); - } - - @Override - public synchronized long getQuantile(double q) { - Preconditions.checkArgument(0.0 <= q && q <= 1.0, - "quantile must be in the range 0.0 to 1.0 inclusive"); - if (count == 0) { - return 0L; - } - - // the two leaves are the only buffer that can be partially filled - int buf0Size = Math.min(bufferSize, leafCount); - int buf1Size = Math.max(0, leafCount - buf0Size); - long sum = 0; - long target = (long) Math.ceil(count * (1.0 - q)); - int i; - - if (! leavesSorted) { - Arrays.sort(buffer[0], 0, buf0Size); - Arrays.sort(buffer[1], 0, buf1Size); - leavesSorted = true; - } - Arrays.fill(indices, bufferSize - 1); - indices[0] = buf0Size - 1; - indices[1] = buf1Size - 1; - - do { - i = biggest(indices); - indices[i]--; - sum += weight(i); - } while (sum < target); - return buffer[i][indices[i] + 1]; - } - - @Override - public synchronized long[] getQuantiles(double[] quantiles) { - return Histograms.extractQuantiles(this, quantiles); - } - - @Override - public synchronized void clear() { - count = 0L; - leafCount = 0; - currentTop = 1; - rootWeight = 1; - leavesSorted = true; - } - - /** - * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the - * underlying histograms as it was just one. - * Note: Should only be used for querying the underlying histograms. - */ - private static class MergedHistogram implements Histogram { - private final ApproximateHistogram[] histograms; - - private MergedHistogram(ApproximateHistogram[] histograms) { - this.histograms = histograms; - } - - @Override - public void add(long x) { - /* Ignore, Shouldn't be used */ - assert(false); - } - - @Override - public void clear() { - /* Ignore, Shouldn't be used */ - assert(false); - } - - @Override - public long getQuantile(double quantile) { - Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0, - "quantile must be in the range 0.0 to 1.0 inclusive"); - - long count = initIndices(); - if (count == 0) { - return 0L; - } - - long sum = 0; - long target = (long) Math.ceil(count * (1.0 - quantile)); - int iHist = -1; - int iBiggest = -1; - do { - long biggest = Long.MIN_VALUE; - for (int i = 0; i < histograms.length; i++) { - ApproximateHistogram hist = histograms[i]; - int indexBiggest = hist.biggest(hist.indices); - if (indexBiggest >= 0) { - long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]]; - if (iBiggest == -1 || biggest <= value) { - iBiggest = indexBiggest; - biggest = value; - iHist = i; - } - } - } - histograms[iHist].indices[iBiggest]--; - sum += histograms[iHist].weight(iBiggest); - } while (sum < target); - - ApproximateHistogram hist = histograms[iHist]; - int i = hist.indices[iBiggest]; - return hist.buffer[iBiggest][i + 1]; - } - - @Override - public synchronized long[] getQuantiles(double[] quantiles) { - return Histograms.extractQuantiles(this, quantiles); - } - - /** - * Initialize the indices array for each Histogram and return the global count. - */ - private long initIndices() { - long count = 0L; - for (int i = 0; i < histograms.length; i++) { - ApproximateHistogram h = histograms[i]; - int[] indices = h.indices; - count += h.count; - int buf0Size = Math.min(h.bufferSize, h.leafCount); - int buf1Size = Math.max(0, h.leafCount - buf0Size); - - if (! h.leavesSorted) { - Arrays.sort(h.buffer[0], 0, buf0Size); - Arrays.sort(h.buffer[1], 0, buf1Size); - h.leavesSorted = true; - } - Arrays.fill(indices, h.bufferSize - 1); - indices[0] = buf0Size - 1; - indices[1] = buf1Size - 1; - } - return count; - } - } - - /** - * Return a MergedHistogram - * @param histograms array of histograms to merged together - * @return a new Histogram - */ - public static Histogram merge(ApproximateHistogram[] histograms) { - return new MergedHistogram(histograms); - } - - /** - * We compute the "smallest possible b" satisfying two inequalities: - * 1) (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N - * 2) k * (2 ^ (b - 1)) >= N - * - * For an explanation of these inequalities, please read the Munro-Paterson or - * the Manku-Rajagopalan-Linday papers. - */ - @VisibleForTesting static int computeDepth(double epsilon, long n) { - int b = 2; - while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) { - b += 1; - } - return b; - } - - @VisibleForTesting static int computeBufferSize(int depth, long n) { - return (int) (n / (1L << (depth - 1))); - } - - /** - * Return an estimation of the memory used by an instance. - * The size is due to: - * - a fix cost (76 bytes) for the class + fields - * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE) - * - indices: 16 + sizeof(Integer) * (depth + 1) - * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE) - * - * Note: This method is tested with unit test, it will break if you had new fields. - * @param bufferSize the size of a buffer - * @param depth the depth of the tree of buffer (depth + 1 buffers) - */ - @VisibleForTesting - static long memoryUsage(int bufferSize, int depth) { - return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3)); - } - - /** - * Return the level of the biggest element (using the indices array 'ids' - * to track which elements have been already returned). Every buffer has - * already been sorted at this point. - * @return the level of the biggest element or -1 if no element has been found - */ - @VisibleForTesting - int biggest(final int[] ids) { - long biggest = Long.MIN_VALUE; - final int id0 = ids[0], id1 = ids[1]; - int iBiggest = -1; - - if (0 < leafCount && 0 <= id0) { - biggest = buffer[0][id0]; - iBiggest = 0; - } - if (bufferSize < leafCount && 0 <= id1) { - long x = buffer[1][id1]; - if (x > biggest) { - biggest = x; - iBiggest = 1; - } - } - for (int i = 2; i < currentTop + 1; i++) { - if (!isBufferEmpty(i) && 0 <= ids[i]) { - long x = buffer[i][ids[i]]; - if (x > biggest) { - biggest = x; - iBiggest = i; - } - } - } - return iBiggest; - } - - - /** - * Based on the number of elements inserted we can easily know if a buffer - * is empty or not - */ - @VisibleForTesting - boolean isBufferEmpty(int level) { - if (level == currentTop) { - return false; // root buffer (if present) is always full - } else { - long levelWeight = 1 << (level - 1); - return (((count - leafCount) / bufferSize) & levelWeight) == 0; - } - } - - /** - * Return the weight of the level ie. 2^(i-1) except for the two tree - * leaves (weight=1) and for the root - */ - private int weight(int level) { - if (level == 0) { - return 1; - } else if (level == maxDepth) { - return rootWeight; - } else { - return 1 << (level - 1); - } - } - - private void allocate(int i) { - if (buffer[i] == null) { - buffer[i] = new long[bufferSize]; - } - } - - /** - * Recursively collapse the buffers of the tree. - * Upper buffers will be allocated on first access in this method. - */ - private void recCollapse(long[] buf, int level) { - // if we reach the root, we can't add more buffer - if (level == maxDepth) { - // weight() return the weight of the root, in that case we need the - // weight of merge result - int mergeWeight = 1 << (level - 1); - int idx = level % 2; - long[] merged = bufferPool[idx]; - long[] tmp = buffer[level]; - collapse(buf, mergeWeight, buffer[level], rootWeight, merged); - buffer[level] = merged; - bufferPool[idx] = tmp; - rootWeight += mergeWeight; - } else { - allocate(level + 1); // lazy allocation (if needed) - if (level == currentTop) { - // if we reach the top, add a new buffer - collapse1(buf, buffer[level], buffer[level + 1]); - currentTop += 1; - rootWeight *= 2; - } else if (isBufferEmpty(level + 1)) { - // if the upper buffer is empty, use it - collapse1(buf, buffer[level], buffer[level + 1]); - } else { - // it the upper buffer isn't empty, collapse with it - long[] merged = bufferPool[level % 2]; - collapse1(buf, buffer[level], merged); - recCollapse(merged, level + 1); - } - } - } - - /** - * collapse two sorted Arrays of different weight - * ex: [2,5,7] weight 2 and [3,8,9] weight 3 - * weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9] - * sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9] - * select every nth elems = [3,7,9] (n = sum weight / 2) - */ - @VisibleForTesting - static void collapse( - long[] left, - int leftWeight, - long[] right, - int rightWeight, - long[] output) { - - int totalWeight = leftWeight + rightWeight; - int halfTotalWeight = (totalWeight / 2) - 1; - int i = 0, j = 0, k = 0, cnt = 0; - - int weight; - long smallest; - - while (i < left.length || j < right.length) { - if (i < left.length && (j == right.length || left[i] < right[j])) { - smallest = left[i]; - weight = leftWeight; - i++; - } else { - smallest = right[j]; - weight = rightWeight; - j++; - } - - int cur = (cnt + halfTotalWeight) / totalWeight; - cnt += weight; - int next = (cnt + halfTotalWeight) / totalWeight; - - for(; cur < next; cur++) { - output[k] = smallest; - k++; - } - } - } - -/** - * Optimized version of collapse for collapsing two array of the same weight - * (which is what we want most of the time) - */ - private static void collapse1( - long[] left, - long[] right, - long[] output) { - - int i = 0, j = 0, k = 0, cnt = 0; - long smallest; - - while (i < left.length || j < right.length) { - if (i < left.length && (j == right.length || left[i] < right[j])) { - smallest = left[i]; - i++; - } else { - smallest = right[j]; - j++; - } - if (cnt % 2 == 1) { - output[k] = smallest; - k++; - } - cnt++; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java deleted file mode 100644 index 024e67b..0000000 --- a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.stats; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.logging.Logger; - -/** - * A map from a key type to integers. This simplifies the process of storing counters for multiple - * values of the same type. - */ -public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable { - private final Map<K, Integer> map = Maps.newHashMap(); - - private static Logger log = Logger.getLogger(CounterMap.class.getName()); - - /** - * Increments the counter value associated with {@code key}, and returns the new value. - * - * @param key The key to increment - * @return The incremented value. - */ - public int incrementAndGet(K key) { - return incrementAndGet(key, 1); - } - - /** - * Increments the value associated with {@code key} by {@code value}, returning the new value. - * - * @param key The key to increment - * @return The incremented value. - */ - public int incrementAndGet(K key, int count) { - Integer value = map.get(key); - if (value == null) { - value = 0; - } - int newValue = count + value; - map.put(key, newValue); - return newValue; - } - - /** - * Gets the value associated with a key. - * - * @param key The key to look up. - * @return The counter value stored for {@code key}, or 0 if no mapping exists. - */ - public int get(K key) { - if (!map.containsKey(key)) { - return 0; - } - - return map.get(key); - } - - /** - * Assigns a value to a key. - * - * @param key The key to assign a value to. - * @param newValue The value to assign. - */ - public void set(K key, int newValue) { - Preconditions.checkNotNull(key); - map.put(key, newValue); - } - - /** - * Resets the value for {@code key}. This will remove the key from the counter. - * - * @param key The key to reset. - */ - public void reset(K key) { - map.remove(key); - } - - /** - * Gets the number of entries stored in the map. - * - * @return The size of the map. - */ - public int size() { - return map.size(); - } - - /** - * Gets an iterator for the mapped values. - * - * @return Iterator for mapped values. - */ - public Iterator<Map.Entry<K, Integer>> iterator() { - return map.entrySet().iterator(); - } - - public Collection<Integer> values() { - return map.values(); - } - - public Set<K> keySet() { - return map.keySet(); - } - - public String toString() { - StringBuilder strVal = new StringBuilder(); - for (Map.Entry<K, Integer> entry : this) { - strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n'); - } - return strVal.toString(); - } - - public Map<K, Integer> toMap() { - return map; - } - - @Override - public CounterMap<K> clone() { - CounterMap<K> newInstance = new CounterMap<K>(); - newInstance.map.putAll(map); - return newInstance; - } -}
