http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java b/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java deleted file mode 100644 index 9b76147..0000000 --- a/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.http.handlers; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.List; - -import javax.annotation.Nullable; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Splitter; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.net.MediaType; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.inject.Inject; - -import com.twitter.common.collections.Iterables2; -import com.twitter.common.stats.TimeSeries; -import com.twitter.common.stats.TimeSeriesRepository; - -/** - * A servlet that provides time series data in JSON format. - */ -public class TimeSeriesDataSource extends HttpServlet { - - @VisibleForTesting static final String TIME_METRIC = "time"; - - private static final String METRICS = "metrics"; - private static final String SINCE = "since"; - - private final TimeSeriesRepository timeSeriesRepo; - private final Gson gson = new Gson(); - - @Inject - public TimeSeriesDataSource(TimeSeriesRepository timeSeriesRepo) { - this.timeSeriesRepo = Preconditions.checkNotNull(timeSeriesRepo); - } - - @VisibleForTesting - String getResponse( - @Nullable String metricsQuery, - @Nullable String sinceQuery) throws MetricException { - - if (metricsQuery == null) { - // Return metric listing. - return gson.toJson(ImmutableList.copyOf(timeSeriesRepo.getAvailableSeries())); - } - - List<Iterable<Number>> tsData = Lists.newArrayList(); - tsData.add(timeSeriesRepo.getTimestamps()); - // Ignore requests for "time" since it is implicitly returned. - Iterable<String> names = Iterables.filter( - Splitter.on(",").split(metricsQuery), - Predicates.not(Predicates.equalTo(TIME_METRIC))); - for (String metric : names) { - TimeSeries series = timeSeriesRepo.get(metric); - if (series == null) { - JsonObject response = new JsonObject(); - response.addProperty("error", "Unknown metric " + metric); - throw new MetricException(gson.toJson(response)); - } - tsData.add(series.getSamples()); - } - - final long since = Long.parseLong(Optional.fromNullable(sinceQuery).or("0")); - Predicate<List<Number>> sinceFilter = new Predicate<List<Number>>() { - @Override public boolean apply(List<Number> next) { - return next.get(0).longValue() > since; - } - }; - - ResponseStruct response = new ResponseStruct( - ImmutableList.<String>builder().add(TIME_METRIC).addAll(names).build(), - FluentIterable.from(Iterables2.zip(tsData, 0)).filter(sinceFilter).toList()); - return gson.toJson(response); - } - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) - throws ServletException, IOException { - - resp.setContentType(MediaType.JSON_UTF_8.toString()); - PrintWriter out = resp.getWriter(); - try { - out.write(getResponse(req.getParameter(METRICS), req.getParameter(SINCE))); - } catch (MetricException e) { - resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); - out.write(e.getMessage()); - } - } - - @VisibleForTesting - static class ResponseStruct { - // Fields must be non-final for deserialization. - List<String> names; - List<List<Number>> data; - - ResponseStruct(List<String> names, List<List<Number>> data) { - this.names = names; - this.data = data; - } - } - - @VisibleForTesting - static class MetricException extends Exception { - MetricException(String message) { - super(message); - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java b/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java deleted file mode 100644 index b9253c8..0000000 --- a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.http.handlers; - -import java.util.Collections; -import java.util.List; - -import javax.servlet.http.HttpServletRequest; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.inject.Inject; - -import com.twitter.common.stats.Stat; - -/** - * HTTP handler that prints all registered variables and their current values. - * - * @author William Farner - */ -public class VarsHandler extends TextResponseHandler { - - private static final Function<Stat, String> VAR_PRINTER = new Function<Stat, String>() { - @Override public String apply(Stat stat) { - return stat.getName() + " " + stat.read(); - } - }; - - private final Supplier<Iterable<Stat<?>>> statSupplier; - - /** - * Creates a new handler that will report stats from the provided supplier. - * - * @param statSupplier Stats supplier. - */ - @Inject - public VarsHandler(Supplier<Iterable<Stat<?>>> statSupplier) { - this.statSupplier = Preconditions.checkNotNull(statSupplier); - } - - @Override - public Iterable<String> getLines(HttpServletRequest request) { - List<String> lines = Lists.newArrayList(Iterables.transform(statSupplier.get(), VAR_PRINTER)); - Collections.sort(lines); - return lines; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java b/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java deleted file mode 100644 index a6c105d..0000000 --- a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.http.handlers; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Map; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.inject.Inject; - -import com.twitter.common.stats.Stat; - -/** - * A servlet that returns the current value of all variables in JSON format. - * The format returns a JSON object with string fields and typed values: - * <pre> - * { - * "var_a": 1, - * "var_b": 126.0, - * "var_c": "a string value", - * } - * </pre> - * If the optional URL parameter 'pretty' is used, the output will be pretty-printed - * (similar to the above example). - * - * @author William Farner - */ -public class VarsJsonHandler extends HttpServlet { - - private final Supplier<Iterable<Stat<?>>> statSupplier; - - /** - * Creates a new handler that will report stats from the provided supplier. - * - * @param statSupplier Stats supplier. - */ - @Inject - public VarsJsonHandler(Supplier<Iterable<Stat<?>>> statSupplier) { - this.statSupplier = Preconditions.checkNotNull(statSupplier); - } - - @VisibleForTesting - String getBody(boolean pretty) { - Map<String, Object> vars = Maps.newLinkedHashMap(); - for (Stat<?> var : statSupplier.get()) { - vars.put(var.getName(), var.read()); - } - return getGson(pretty).toJson(vars); - } - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) - throws ServletException, IOException { - - resp.setContentType("application/json"); - resp.setStatus(HttpServletResponse.SC_OK); - PrintWriter responseBody = resp.getWriter(); - try { - responseBody.print(getBody(req.getParameter("pretty") != null)); - } finally { - responseBody.close(); - } - } - - private Gson getGson(boolean pretty) { - return pretty ? new GsonBuilder().setPrettyPrinting().create() : new Gson(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java deleted file mode 100644 index d2e17c9..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.logging.Logger; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import com.twitter.common.net.pool.ResourceExhaustedException; - -/** - * A load balancer that attempts to direct load towards a backend that has the fewest leased - * connections. - * - * @author William Farner - */ -public class LeastConnectedStrategy<S> extends StaticLoadBalancingStrategy<S> { - private static final Logger LOG = Logger.getLogger(LeastConnectedStrategy.class.getName()); - - // Maps from backends to the number of connections made to them. - private final Map<S, ConnectionStats> connections = Maps.newHashMap(); - - // Manages sorting of connection counts, with a reference back to the backend. - private final SortedSet<ConnectionStats> connectionStats = Sets.newTreeSet(); - - /** - * Encapsulates a set of connection stats that allow connections to be sorted as per the least - * connected strategy. - */ - private class ConnectionStats implements Comparable<ConnectionStats> { - final S connectionKey; - final int connectionId; - int activeCount = 0; // Stores the total number of active connections. - long useCount = 0; // Stores the total number times a connection has been used. - - ConnectionStats(S connectionKey, int connectionId) { - this.connectionKey = connectionKey; - this.connectionId = connectionId; - } - - @Override - public int compareTo(ConnectionStats other) { - // Sort by number of active connections first. - int difference = activeCount - other.activeCount; - if (difference != 0) { - return difference; - } - - // Sub-sort by total number of times a connection has been used (this will ensure that - // all backends are exercised). - long useDifference = useCount - other.useCount; - if (useDifference != 0) { - return Long.signum(useDifference); - } - - // If the above two are equal, break the tie using the connection id. - return connectionId - other.connectionId; - } - - @Override - public boolean equals(Object o) { - // We use ConnectionStats in a sorted container and so we need to have an equals - // implementation consistent with compareTo, ie: - // (x.compareTo(y) == 0) == x.equals(y) - // We accomplish this directly. - - @SuppressWarnings("unchecked") - ConnectionStats other = (ConnectionStats) o; - return compareTo(other) == 0; - } - - @Override - public String toString() { - return String.format("%d-%d", activeCount, useCount); - } - } - - @Override - protected Collection<S> onBackendsOffered(Set<S> backends) { - Map<S, ConnectionStats> newConnections = Maps.newHashMapWithExpectedSize(backends.size()); - Collection<ConnectionStats> newConnectionStats = - Lists.newArrayListWithCapacity(backends.size()); - - // Recreate all connection stats since their ordering may have changed and this is used for - // comparison tie breaks. - int backendId = 0; - for (S backend : backends) { - ConnectionStats stats = new ConnectionStats(backend, backendId++); - - // Retain the activeCount for existing backends to prevent dogpiling existing active servers - ConnectionStats existing = connections.get(backend); - if (existing != null) { - stats.activeCount = existing.activeCount; - } - - newConnections.put(backend, stats); - newConnectionStats.add(stats); - } - - connections.clear(); - connections.putAll(newConnections); - connectionStats.clear(); - connectionStats.addAll(newConnectionStats); - - return connections.keySet(); - } - - @Override - public S nextBackend() throws ResourceExhaustedException { - Preconditions.checkState(connections.size() == connectionStats.size()); - - if (connectionStats.isEmpty()) { - throw new ResourceExhaustedException("No backends."); - } - - return connectionStats.first().connectionKey; - } - - @Override - public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) { - Preconditions.checkNotNull(backendKey); - Preconditions.checkState(connections.size() == connectionStats.size()); - Preconditions.checkNotNull(result); - - ConnectionStats stats = connections.get(backendKey); - Preconditions.checkNotNull(stats); - - Preconditions.checkState(connectionStats.remove(stats)); - if (result == ConnectionResult.SUCCESS) { - stats.activeCount++; - } - stats.useCount++; - Preconditions.checkState(connectionStats.add(stats)); - } - - @Override - public void connectionReturned(S backendKey) { - Preconditions.checkNotNull(backendKey); - Preconditions.checkState(connections.size() == connectionStats.size()); - - ConnectionStats stats = connections.get(backendKey); - Preconditions.checkNotNull(stats); - - if (stats.activeCount > 0) { - Preconditions.checkState(connectionStats.remove(stats)); - stats.activeCount--; - Preconditions.checkState(connectionStats.add(stats)); - } else { - LOG.warning("connection stats dropped below zero, ignoring"); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java deleted file mode 100644 index b514b2d..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.twitter.common.base.Closure; -import com.twitter.common.net.pool.ResourceExhaustedException; -import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; - -import java.util.Collection; -import java.util.Set; - -/** - * A load balancer, which will be used to determine which of a set of backends should be connected - * to for service calls. It is expected that the backends themselves can be changed at any time, - * and the load balancer should immediately restrict itself to using only those backends. - * - * It is likely that the load balancer implementation will periodically receive information about - * backends that it technically should no longer know about. An example is calls to - * {@link #requestResult(Object, RequestResult, long)} and {@link #released(Object)} for - * in-flight requests after backends were changed by {@link #offerBackends(Set, Closure)}. - * - * @author William Farner - */ -public interface LoadBalancer<K> extends RequestTracker<K> { - - /** - * Offers a set of backends that the load balancer should choose from to distribute load amongst. - * - * @param offeredBackends Backends to choose from. - * @param onBackendsChosen A callback that should be notified when the offered backends have been - * (re)chosen from. - */ - void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen); - - /** - * Gets the next backend that a request should be sent to. - * - * @return Next backend to send a request. - * @throws ResourceExhaustedException If there are no available backends. - */ - K nextBackend() throws ResourceExhaustedException; - - /** - * Signals the load balancer that a connection was made. - * - * @param backend The backend that was connected to. - * @param connectTimeNanos The time spent waiting for the connection to be established. - */ - void connected(K backend, long connectTimeNanos); - - /** - * Signals the load balancer that a connection was attempted, but failed. - * - * @param backend The backend to which connection attempt was made. - * @param result The result of the connection attempt (only FAILED and TIMEOUT are permitted). - */ - void connectFailed(K backend, ConnectionResult result); - - /** - * Signals the load balancer that a connection was released, and is idle. - * - * @param connection Idle connection. - */ - void released(K connection); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java deleted file mode 100644 index 5f94948..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import java.util.Collection; -import java.util.Set; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; - -import com.twitter.common.base.Closure; -import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; -import com.twitter.common.net.pool.ResourceExhaustedException; - -/** - * Implementation of a load balancer, that uses a pluggable {@link LoadBalancingStrategy} to define - * actual load balancing behavior. This class handles the responsibility of associating connections - * with backends. - * - * Calls to {@link #connected(Object, long)}, - * {@link #requestResult(Object, RequestResult, long)}, and {@link #released(Object)} will not - * be forwarded for unknown backends/connections. - * - * @author William Farner - */ -public class LoadBalancerImpl<K> implements LoadBalancer<K> { - - private final LoadBalancingStrategy<K> strategy; - - private Set<K> offeredBackends = ImmutableSet.of(); - - /** - * Creates a new load balancer that will use the given strategy. - * - * @param strategy Strategy to delegate load balancing work to. - */ - public LoadBalancerImpl(LoadBalancingStrategy<K> strategy) { - this.strategy = Preconditions.checkNotNull(strategy); - } - - @Override - public synchronized void offerBackends(Set<K> offeredBackends, - final Closure<Collection<K>> onBackendsChosen) { - this.offeredBackends = ImmutableSet.copyOf(offeredBackends); - strategy.offerBackends(offeredBackends, new Closure<Collection<K>>() { - @Override public void execute(Collection<K> chosenBackends) { - onBackendsChosen.execute(chosenBackends); - } - }); - } - - @Override - public synchronized K nextBackend() throws ResourceExhaustedException { - return strategy.nextBackend(); - } - - @Override - public synchronized void connected(K backend, long connectTimeNanos) { - Preconditions.checkNotNull(backend); - - if (!hasBackend(backend)) return; - - strategy.addConnectResult(backend, ConnectionResult.SUCCESS, connectTimeNanos); - } - - private boolean hasBackend(K backend) { - return offeredBackends.contains(backend); - } - - @Override - public synchronized void connectFailed(K backend, ConnectionResult result) { - Preconditions.checkNotNull(backend); - Preconditions.checkNotNull(result); - Preconditions.checkArgument(result != ConnectionResult.SUCCESS); - - if (!hasBackend(backend)) return; - - strategy.addConnectResult(backend, result, 0); - } - - @Override - public synchronized void released(K backend) { - Preconditions.checkNotNull(backend); - - if (!hasBackend(backend)) return; - - strategy.connectionReturned(backend); - } - - @Override - public synchronized void requestResult(K backend, RequestResult result, long requestTimeNanos) { - Preconditions.checkNotNull(backend); - Preconditions.checkNotNull(result); - - if (!hasBackend(backend)) return; - - strategy.addRequestResult(backend, result, requestTimeNanos); - } - - /** - * Convenience method to create a new load balancer. - * - * @param strategy Strategy to use. - * @param <K> Backend type. - * @return A new load balancer. - */ - public static <K> LoadBalancerImpl<K> - create(LoadBalancingStrategy<K> strategy) { - return new LoadBalancerImpl<K>(strategy); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java deleted file mode 100644 index 08cb9b5..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.twitter.common.base.Closure; -import com.twitter.common.net.pool.ResourceExhaustedException; -import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; - -import java.util.Collection; -import java.util.Set; - -/** - * A strategy for balancing request load among backends. - * - * Strategies should be externally synchronized, and therefore do not have to worry about reentrant - * access. - * - * @author William Farner - */ -public interface LoadBalancingStrategy<K> { - - /** - * Offers a set of backends that the load balancer should choose from to distribute load amongst. - * - * @param offeredBackends Backends to choose from. - * @param onBackendsChosen A callback that should be notified when the offered backends have been - * (re)chosen from. - */ - public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen); - - /** - * Gets the next backend that a request should be sent to. - * - * @return Next backend to send a request. - * @throws ResourceExhaustedException If there are no available backends. - */ - public K nextBackend() throws ResourceExhaustedException; - - /** - * Offers information about a connection result. - * - * @param key Backend key. - * @param result Connection result. - * @param connectTimeNanos Time spent waiting for connection to be established. - */ - public void addConnectResult(K key, ConnectionResult result, long connectTimeNanos); - - /** - * Offers information about a connection that was returned. - * - * @param key Backend key. - */ - public void connectionReturned(K key); - - /** - * Offers information about a request result. - * - * @param key Backend key. - * @param result Request result. - * @param requestTimeNanos Time spent waiting for a connection to be established. - */ - public void addRequestResult(K key, RequestResult result, long requestTimeNanos); - - enum ConnectionResult { - FAILED, - TIMEOUT, - SUCCESS - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java deleted file mode 100644 index 19b7703..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.twitter.common.base.Closure; -import com.twitter.common.net.pool.ResourceExhaustedException; -import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; -import com.twitter.common.util.BackoffDecider; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.logging.Logger; - -/** - * A load balancer that serves as a layer above another load balancer to mark hosts as dead, and - * prevent them from being visible to the wrapped load balancer. - * If all backends become marked as dead, they will all be unmarked. - * - * @author William Farner - */ -public class MarkDeadStrategy<S> implements LoadBalancingStrategy<S> { - private static final Logger LOG = Logger.getLogger(MarkDeadStrategy.class.getName()); - - private final LoadBalancingStrategy<S> wrappedStrategy; - private final Map<S, BackoffDecider> targets = Maps.newHashMap(); - private final Function<S, BackoffDecider> backoffFactory; - protected final Predicate<S> hostChecker; - - private Set<S> liveBackends = null; - private Closure<Collection<S>> onBackendsChosen = null; - - // Flipped when we are in "forced live" mode, where all backends are considered dead and we - // send them all traffic as a last-ditch effort. - private boolean forcedLive = false; - - /** - * Creates a mark dead strategy with a wrapped strategy, backoff decider factory - * and a predicate host checker. Use this constructor if you want to pass in the - * your own implementation of the host checker. - * - * @param wrappedStrategy one of the implementations of the load balancing strategy. - * @param backoffFactory backoff decider factory per host. - * @param hostChecker predicate that returns {@code true} if the host is alive, otherwise returns {@code false}. - */ - public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy, - Function<S, BackoffDecider> backoffFactory, Predicate<S> hostChecker) { - this.wrappedStrategy = Preconditions.checkNotNull(wrappedStrategy); - this.backoffFactory = Preconditions.checkNotNull(backoffFactory); - this.hostChecker = Preconditions.checkNotNull(hostChecker); - } - - /** - * Constructor that uses a default predicate host checker that always returns true. - * This is the default constructor that all consumers of MarkDeadStrategy currently use. - * - * @param wrappedStrategy one of the implementations of the load balancing strategy. - * @param backoffFactory backoff decider factory per host. - */ - public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy, - Function<S, BackoffDecider> backoffFactory) { - this(wrappedStrategy, backoffFactory, Predicates.<S>alwaysTrue()); - } - - @Override - public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> onBackendsChosen) { - this.onBackendsChosen = onBackendsChosen; - targets.keySet().retainAll(offeredBackends); - for (S backend : offeredBackends) { - if (!targets.containsKey(backend)) { - targets.put(backend, backoffFactory.apply(backend)); - } - } - - adjustBackends(); - } - - @Override - public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) { - Preconditions.checkNotNull(backendKey); - Preconditions.checkNotNull(result); - - BackoffDecider decider = targets.get(backendKey); - Preconditions.checkNotNull(decider); - - addResult(decider, result); - if (shouldNotifyFor(backendKey)) { - wrappedStrategy.addConnectResult(backendKey, result, connectTimeNanos); - } - } - - @Override - public void connectionReturned(S backendKey) { - Preconditions.checkNotNull(backendKey); - - if (shouldNotifyFor(backendKey)) { - wrappedStrategy.connectionReturned(backendKey); - } - } - - @Override - public void addRequestResult(S requestKey, RequestResult result, - long requestTimeNanos) { - Preconditions.checkNotNull(requestKey); - Preconditions.checkNotNull(result); - - BackoffDecider decider = targets.get(requestKey); - Preconditions.checkNotNull(decider); - - addResult(decider, result); - if (shouldNotifyFor(requestKey)) { - wrappedStrategy.addRequestResult(requestKey, result, requestTimeNanos); - } - } - - private void addResult(BackoffDecider decider, ConnectionResult result) { - switch (result) { - case FAILED: - case TIMEOUT: - addResult(decider, false); - break; - case SUCCESS: - addResult(decider, true); - break; - default: - throw new UnsupportedOperationException("Unhandled result type " + result); - } - } - - private void addResult(BackoffDecider decider, RequestTracker.RequestResult result) { - switch (result) { - case FAILED: - case TIMEOUT: - addResult(decider, false); - break; - case SUCCESS: - addResult(decider, true); - break; - default: - throw new UnsupportedOperationException("Unhandled result type " + result); - } - } - - private void addResult(BackoffDecider decider, boolean success) { - if (success) { - decider.addSuccess(); - } else { - decider.addFailure(); - } - - // Check if any of the backends have moved into or out of dead state. - for (Map.Entry<S, BackoffDecider> entry : targets.entrySet()) { - boolean dead = entry.getValue().shouldBackOff(); - boolean markedDead = !liveBackends.contains(entry.getKey()); - - // only check the servers that were marked dead before and see if we can - // connect to them, otherwise set dead to true. - if (markedDead && !dead) { - boolean alive = hostChecker.apply(entry.getKey()); - if (!alive) { - entry.getValue().transitionToBackOff(0, true); - } - dead = !alive; - } - - if (dead && !markedDead && forcedLive) { - // Do nothing here. Since we have forced all backends to be live, we don't want to - // continually advertise the backend list to the wrapped strategy. - } else if (dead != markedDead || !dead && forcedLive) { - adjustBackends(); - break; - } - } - } - - private boolean shouldNotifyFor(S backend) { - return liveBackends.contains(backend); - } - - private final Predicate<S> deadTargetFilter = new Predicate<S>() { - @Override public boolean apply(S backend) { - return !targets.get(backend).shouldBackOff(); - } - }; - - private void adjustBackends() { - liveBackends = Sets.newHashSet(Iterables.filter(targets.keySet(), deadTargetFilter)); - if (liveBackends.isEmpty()) { - liveBackends = targets.keySet(); - forcedLive = true; - } else { - forcedLive = false; - } - LOG.info("Observed backend state change, changing live backends to " + liveBackends); - wrappedStrategy.offerBackends(liveBackends, onBackendsChosen); - } - - @Override - public S nextBackend() throws ResourceExhaustedException { - return wrappedStrategy.nextBackend(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java deleted file mode 100644 index 607d327..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import java.util.Map; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.Maps; - -import com.twitter.common.util.BackoffDecider; - -/** - * A load balancing strategy that extends the functionality of the mark dead strategy by - * integrating a hostChecker that allows hosts to transition out of a dead state - * if the most recent connection to the host was successful. - * - * @param <S> typically socket address of a backend host. - * @author Krishna Gade - */ -public class MarkDeadStrategyWithHostCheck<S> extends MarkDeadStrategy<S> { - - /** - * LiveHostChecker implements Filter to determine whether a host is alive based on the - * result of the most recent connection attempt to that host. It keeps a map of - * backend -> last connection result, which gets updated every time someone tries to - * add to connection result. - */ - protected static class LiveHostChecker<S> implements Predicate<S> { - private final Map<S, ConnectionResult> lastConnectionResult = Maps.newHashMap(); - - /** - * Adds the connection result of this backend to the last connection result map. - * - * @param backend typically the socket address of the backend. - * @param result result of what happened when the client tried to connect to this backend. - */ - public void addConnectResult(S backend, ConnectionResult result) { - lastConnectionResult.put(backend, result); - } - - /** - * Checks if the last connection result for this backend and returns {@code true} if it - * was {@link LoadBalancingStrategy.ConnectionResult#SUCCESS} otherwise returns {@code false}. - * - * @param backend typically the socket address of the backend. - */ - @Override public boolean apply(S backend) { - ConnectionResult result = lastConnectionResult.get(backend); - return result != null && result == ConnectionResult.SUCCESS; - } - } - - // Reference to the host checker we pass to the super class. - // We keep it here to avoid casting on every access to it. - protected final LiveHostChecker<S> liveHostChecker; - - /** - * Creates a mark dead strategy with the given wrapped strategy and backoff decider factory. - * It uses a hostChecker {@link Predicate} that allows hosts to transition out - * of a dead state if the most recent connection to the host was successful. - * - * @param wrappedStrategy one of the implementations of the load balancing strategy. - * @param backoffFactory backoff decider factory per host. - */ - public MarkDeadStrategyWithHostCheck(LoadBalancingStrategy<S> wrappedStrategy, - Function<S, BackoffDecider> backoffFactory) { - super(wrappedStrategy, backoffFactory, new LiveHostChecker<S>()); - // Casting to LiveHostChecker is safe here as that's the only predicate that we pass to super. - this.liveHostChecker = ((LiveHostChecker<S>) hostChecker); - } - - - /** - * Overrides the base class implementation by adding this connection result to the - * host checker. - * - * @param backendKey typically the socket address of the backend. - * @param result result of what happened when the client tried to connect to this backend. - * @param connectTimeNanos time took to connect to the backend in nano seconds. - */ - @Override - public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) { - liveHostChecker.addConnectResult(backendKey, result); - super.addConnectResult(backendKey, result, connectTimeNanos); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java deleted file mode 100644 index b634d95..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.twitter.common.net.pool.ResourceExhaustedException; - -import java.util.Collection; -import java.util.List; -import java.util.Random; -import java.util.Set; - -/** - * A load balancer that selects a random backend each time a request is made.. - * - * @author William Farner - */ -public class RandomStrategy<S> extends StaticLoadBalancingStrategy<S> { - - private List<S> targets = Lists.newArrayList(); - private final Random random; - - public RandomStrategy() { - this(new Random()); - } - - @VisibleForTesting - RandomStrategy(Random random) { - this.random = Preconditions.checkNotNull(random); - } - - @Override - protected Collection<S> onBackendsOffered(Set<S> targets) { - this.targets = ImmutableList.copyOf(targets); - return this.targets; - } - - @Override - public S nextBackend() throws ResourceExhaustedException { - if (targets.isEmpty()) throw new ResourceExhaustedException("No backends."); - return targets.get(random.nextInt(targets.size())); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java deleted file mode 100644 index 6450222..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -/** - * Tracks requests made to a backend service. - * - * @author William Farner - */ -public interface RequestTracker<T> { - - /** - * Informs the tracker of a completed request. - * - * @param key Key to identify the owner of the request. - * @param result Result of the request. - * @param requestTimeNanos Time duration spent waiting for the request to complete. - */ - void requestResult(T key, RequestResult result, long requestTimeNanos); - - enum RequestResult { - FAILED, - TIMEOUT, - SUCCESS - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java deleted file mode 100644 index e656ad2..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.twitter.common.net.pool.ResourceExhaustedException; - -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -/** - * A load balancer that distributes load by randomizing the list of available backends, and then - * rotating through them evenly. - * - * @author William Farner - */ -public class RoundRobinStrategy<S> extends StaticLoadBalancingStrategy<S> { - - private Iterator<S> iterator = Iterators.emptyIterator(); - - @Override - protected Collection<S> onBackendsOffered(Set<S> targets) { - List<S> newTargets = Lists.newArrayList(targets); - Collections.shuffle(newTargets); - iterator = Iterators.cycle(newTargets); - return newTargets; - } - - @Override - public S nextBackend() throws ResourceExhaustedException { - if (!iterator.hasNext()) throw new ResourceExhaustedException("No backends available!"); - return iterator.next(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java deleted file mode 100644 index 483e799..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.twitter.common.base.Closure; -import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; - -import java.util.Collection; -import java.util.Set; - -/** - * A baseclass for LoadBalancingStrategies that use a static set of backends they are - * {@link #offerBackends(java.util.Set, com.twitter.common.base.Closure) offered}. Also acts as an - * adapter, providing no-op implementations of all other LoadBalancingStrategy methods that only - * need be overridden as required by subclass features. - * - * @author John Sirois - */ -abstract class StaticLoadBalancingStrategy<K> implements LoadBalancingStrategy<K> { - - @Override - public final void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen) { - onBackendsChosen.execute(onBackendsOffered(offeredBackends)); - } - - /** - * Subclasses must override and return a collection of the backends actually chosen for use until - * the next offer round. - * - * @param offeredBackends The backends offered in a {@link - * #offerBackends(java.util.Set, com.twitter.common.base.Closure)} event. - * @return The collection of backends that will be used until the next offer event. - */ - protected abstract Collection<K> onBackendsOffered(Set<K> offeredBackends); - - @Override - public void addConnectResult(K backendKey, ConnectionResult result, long connectTimeNanos) { - // No-op. - } - - @Override - public void connectionReturned(K backendKey) { - // No-op. - } - - @Override - public void addRequestResult(K requestKey, RequestResult result, long requestTimeNanos) { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java deleted file mode 100644 index 104729b..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.twitter.common.base.Closure; -import com.twitter.common.net.pool.ResourceExhaustedException; -import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -/** - * A load balancer that maintains a fixed upper bound on the number of backends that will be made - * available for a wrapped load balancer. - * - * TODO(William Farner): May want to consider periodically swapping subsets. - * - * TODO(William Farner): May want to catch ResourceExhaustedExceptions from wrapped strategy and adjust - * subset if possible. - * - * @author William Farner - */ -public class SubsetStrategy<S> implements LoadBalancingStrategy<S> { - private final LoadBalancingStrategy<S> wrapped; - private final int maxBackends; - - private Set<S> backendSubset = Sets.newHashSet(); - - public SubsetStrategy(int maxBackends, LoadBalancingStrategy<S> wrapped) { - Preconditions.checkArgument(maxBackends > 0); - this.maxBackends = maxBackends; - this.wrapped = Preconditions.checkNotNull(wrapped); - } - - @Override - public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> onBackendsChosen) { - List<S> allTargets = Lists.newArrayList(offeredBackends); - Collections.shuffle(allTargets); - backendSubset = ImmutableSet.copyOf( - allTargets.subList(0, Math.min(maxBackends, allTargets.size()))); - wrapped.offerBackends(backendSubset, onBackendsChosen); - } - - @Override - public void addConnectResult(S backendKey, ConnectionResult result, - long connectTimeNanos) { - if (backendSubset.contains(backendKey)) { - wrapped.addConnectResult(backendKey, result, connectTimeNanos); - } - } - - @Override - public void connectionReturned(S backendKey) { - if (backendSubset.contains(backendKey)) { - wrapped.connectionReturned(backendKey); - } - } - - @Override - public void addRequestResult(S requestKey, RequestResult result, long requestTimeNanos) { - Preconditions.checkNotNull(requestKey); - - if (backendSubset.contains(requestKey)) { - wrapped.addRequestResult(requestKey, result, requestTimeNanos); - } - } - - @Override - public S nextBackend() throws ResourceExhaustedException { - return wrapped.nextBackend(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java deleted file mode 100644 index e3bf25b..0000000 --- a/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.loadbalancing; - -import com.google.common.base.Preconditions; -import com.twitter.common.base.Closure; -import com.twitter.common.net.pool.ResourceExhaustedException; -import com.twitter.common.net.monitoring.TrafficMonitor; - -import java.util.Collection; -import java.util.Set; - -/** - * @author William Farner - */ -public class TrafficMonitorAdapter<K> implements LoadBalancingStrategy<K> { - private final LoadBalancingStrategy<K> strategy; - private final TrafficMonitor<K> monitor; - - public TrafficMonitorAdapter(LoadBalancingStrategy<K> strategy, TrafficMonitor<K> monitor) { - this.strategy = Preconditions.checkNotNull(strategy); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public static <K> TrafficMonitorAdapter<K> create(LoadBalancingStrategy<K> strategy, - TrafficMonitor<K> monitor) { - return new TrafficMonitorAdapter<K>(strategy, monitor); - } - - @Override - public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen) { - strategy.offerBackends(offeredBackends, onBackendsChosen); - } - - @Override - public K nextBackend() throws ResourceExhaustedException { - return strategy.nextBackend(); - } - - @Override - public void addConnectResult(K key, ConnectionResult result, long connectTimeNanos) { - strategy.addConnectResult(key, result, connectTimeNanos); - if (result == ConnectionResult.SUCCESS) monitor.connected(key); - } - - @Override - public void connectionReturned(K key) { - strategy.connectionReturned(key); - monitor.released(key); - } - - @Override - public void addRequestResult(K key, RequestTracker.RequestResult result, long requestTimeNanos) { - strategy.addRequestResult(key, result, requestTimeNanos); - monitor.requestResult(key, result, requestTimeNanos); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java b/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java deleted file mode 100644 index cd881bf..0000000 --- a/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.monitoring; - -/** - * Monitors active connections between two hosts.. - * - * @author William Farner - */ -public interface ConnectionMonitor<K> { - - /** - * Instructs the monitor that a connection was established. - * - * @param connectionKey Key for the host that a connection was established with. - */ - public void connected(K connectionKey); - - /** - * Informs the monitor that a connection was released. - * - * @param connectionKey Key for the host that a connection was released for. - */ - public void released(K connectionKey); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java b/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java deleted file mode 100644 index fd5b577..0000000 --- a/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.monitoring; - -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.concurrent.GuardedBy; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.net.loadbalancing.RequestTracker; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Clock; -import com.twitter.common.util.concurrent.ExecutorServiceShutdown; - -/** - * Monitors activity on established connections between two hosts. This can be used for a server - * to track inbound clients, or for a client to track requests sent to different servers. - * - * The monitor will retain information for hosts that may no longer be active, but will expunge - * information for hosts that have been idle for more than five minutes. - * - * @author William Farner - */ -public class TrafficMonitor<K> implements ConnectionMonitor<K>, RequestTracker<K> { - - @VisibleForTesting - static final Amount<Long, Time> DEFAULT_GC_INTERVAL = Amount.of(5L, Time.MINUTES); - - @GuardedBy("this") - private final LoadingCache<K, TrafficInfo> trafficInfos; - - private final String serviceName; - private final Amount<Long, Time> gcInterval; - - private AtomicLong lifetimeRequests = new AtomicLong(); - private final Clock clock; - private final ScheduledExecutorService gcExecutor; - - /** - * Creates a new traffic monitor using the default cleanup interval. - * - * @param serviceName Name of the service to monitor, used for creating variable names. - */ - public TrafficMonitor(final String serviceName) { - this(serviceName, DEFAULT_GC_INTERVAL); - } - - /** - * Creates a new traffic monitor with a custom cleanup interval. - * - * @param serviceName Service name for the monitor. - * @param gcInterval Interval on which the remote host garbage collector should run. - */ - public TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval) { - this(serviceName, gcInterval, Clock.SYSTEM_CLOCK); - } - - /** - * Convenience method to create a typed traffic monitor. - * - * @param serviceName Service name for the monitor. - * @param <T> Monitor type. - * @return A new traffic monitor. - */ - public static <T> TrafficMonitor<T> create(String serviceName) { - return new TrafficMonitor<T>(serviceName); - } - - @VisibleForTesting - TrafficMonitor(final String serviceName, Clock clock) { - this(serviceName, DEFAULT_GC_INTERVAL, clock); - } - - private TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval, Clock clock) { - this.serviceName = MorePreconditions.checkNotBlank(serviceName); - this.clock = Preconditions.checkNotNull(clock); - Preconditions.checkNotNull(gcInterval); - Preconditions.checkArgument(gcInterval.getValue() > 0, "GC interval must be > zero."); - this.gcInterval = gcInterval; - - trafficInfos = CacheBuilder.newBuilder().build(new CacheLoader<K, TrafficInfo>() { - @Override public TrafficInfo load(K key) { - return new TrafficInfo(key); - } - }); - - Runnable gc = new Runnable() { - @Override public void run() { gc(); } - }; - - gcExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("TrafficMonitor-gc-%d").build()); - gcExecutor.scheduleAtFixedRate(gc, gcInterval.as(Time.SECONDS), gcInterval.as(Time.SECONDS), - TimeUnit.SECONDS); - } - - /** - * Gets the name of the service that this monitor is monitoring. - * - * @return Monitor's service name. - */ - public String getServiceName() { - return serviceName; - } - - /** - * Gets the total number of requests that this monitor has observed, for all remote hosts. - * - * @return Total number of requests observed. - */ - public long getLifetimeRequestCount() { - return lifetimeRequests.get(); - } - - /** - * Fetches all current traffic information. - * - * @return A map from the host key type to information about that host. - */ - public synchronized Map<K, TrafficInfo> getTrafficInfo() { - return ImmutableMap.copyOf(trafficInfos.asMap()); - } - - @Override - public synchronized void connected(K key) { - Preconditions.checkNotNull(key); - - trafficInfos.getUnchecked(key).incConnections(); - } - - @Override - public synchronized void released(K key) { - Preconditions.checkNotNull(key); - - TrafficInfo info = trafficInfos.getUnchecked(key); - - Preconditions.checkState(info.getConnectionCount() > 0, "Double release detected!"); - info.decConnections(); - } - - @Override - public void requestResult(K key, RequestResult result, long requestTimeNanos) { - Preconditions.checkNotNull(key); - - lifetimeRequests.incrementAndGet(); - trafficInfos.getUnchecked(key).addResult(result); - } - - @VisibleForTesting - synchronized void gc() { - Iterables.removeIf(trafficInfos.asMap().entrySet(), - new Predicate<Map.Entry<K, TrafficInfo>>() { - @Override public boolean apply(Map.Entry<K, TrafficInfo> clientInfo) { - if (clientInfo.getValue().connections.get() > 0) return false; - - long idlePeriod = clock.nowNanos() - clientInfo.getValue().getLastActiveTimestamp(); - - return idlePeriod > gcInterval.as(Time.NANOSECONDS); - } - }); - } - - /** - * Shuts down TrafficMonitor by stopping background gc task. - */ - public void shutdown() { - new ExecutorServiceShutdown(gcExecutor, Amount.of(0L, Time.SECONDS)).execute(); - } - - /** - * Information about traffic obsserved to/from a specific host. - */ - public class TrafficInfo { - private final K key; - private AtomicInteger requestSuccesses = new AtomicInteger(); - private AtomicInteger requestFailures = new AtomicInteger(); - private AtomicInteger connections = new AtomicInteger(); - private AtomicLong lastActive = new AtomicLong(); - - TrafficInfo(K key) { - this.key = key; - pulse(); - } - - void pulse() { - lastActive.set(clock.nowNanos()); - } - - public K getKey() { - return key; - } - - void addResult(RequestResult result) { - pulse(); - switch (result) { - case SUCCESS: - requestSuccesses.incrementAndGet(); - break; - case FAILED: - case TIMEOUT: - requestFailures.incrementAndGet(); - break; - } - } - - public int getRequestSuccessCount() { - return requestSuccesses.get(); - } - - public int getRequestFailureCount() { - return requestFailures.get(); - } - - int incConnections() { - pulse(); - return connections.incrementAndGet(); - } - - int decConnections() { - pulse(); - return connections.decrementAndGet(); - } - - public int getConnectionCount() { - return connections.get(); - } - - public long getLastActiveTimestamp() { - return lastActive.get(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/Connection.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/Connection.java b/commons/src/main/java/com/twitter/common/net/pool/Connection.java deleted file mode 100644 index cf8f1a4..0000000 --- a/commons/src/main/java/com/twitter/common/net/pool/Connection.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.pool; - -import com.google.common.base.Supplier; - -import java.io.Closeable; - -/** - * An interface to a connection resource that may become invalid. - * - * @author John Sirois - */ -public interface Connection<T, E> extends Supplier<T>, Closeable { - - /** - * This will always be the same underlying connection for the lifetime of this object. - * - * @return the connection - */ - @Override T get(); - - /** - * @return {@code true} if the supplied connection is valid for use. - */ - boolean isValid(); - - /** - * Closes this connection. - */ - void close(); - - /** - * @return the endpoint this connection is connected to. - */ - E getEndpoint(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java deleted file mode 100644 index 7b87bc7..0000000 --- a/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.pool; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -/** - * A factory for connections that also dictates policy for the size of the connection population. - * - * <p>TODO(John Sirois): separate concerns - mixing in willCreate/null protocol is already tangling - * implementation code - * - * @author John Sirois - */ -public interface ConnectionFactory<S extends Connection<?, ?>> { - - /** - * Checks whether this factory might create a connection if requested. - * - * @return {@code} true if this factory might create a connection at this point in time; ie - * a call to {@link #create} might not have returned {@code null}. May return true to multiple - * threads if concurrently creating connections. - */ - boolean mightCreate(); - - /** - * Attempts to create a new connection within the given timeout and subject to this factory's - * connection population size policy. - * - * @param timeout the maximum amount of time to wait - * @return a new connection or null if there are too many connections already - * @throws Exception if there was a problem creating the connection or establishing the connection - * takes too long - */ - S create(Amount<Long, Time> timeout) throws Exception; - - /** - * Destroys a connection. It is an error to attempt to destroy a connection this factory did - * not {@link #create} - * - * @param connection The connection to destroy. - */ - void destroy(S connection); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java deleted file mode 100644 index 81d7684..0000000 --- a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.pool; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.common.base.Supplier; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; -import com.twitter.common.stats.StatsProvider; - -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and - * connection choice to a supplied strategy. - * - * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in - * use. - * - * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command - * - * @author John Sirois - */ -public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> { - - private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName()); - - private final Set<S> leasedConnections = - Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap()); - private final Set<S> availableConnections = Sets.newHashSet(); - private final Lock poolLock; - private final Condition available; - - private final ConnectionFactory<S> connectionFactory; - private final Executor executor; - - private volatile boolean closed; - private final AtomicLong connectionsCreated; - private final AtomicLong connectionsDestroyed; - private final AtomicLong connectionsReturned; - - /** - * Creates a connection pool with a connection picker that selects the first item in the set of - * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}. - * - * @param connectionFactory Factory to create and destroy connections. - */ - public ConnectionPool(ConnectionFactory<S> connectionFactory) { - this(connectionFactory, Stats.STATS_PROVIDER); - } - - /** - * Creates a connection pool with a connection picker that selects the first item in the set of - * available connections and uses the supplied StatsProvider to register stats with. - * - * @param connectionFactory Factory to create and destroy connections. - * @param statsProvider Stats export provider. - */ - public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) { - this(Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("CP-" + connectionFactory + "[%d]") - .setDaemon(true) - .build()), - new ReentrantLock(true), connectionFactory, statsProvider); - } - - @VisibleForTesting - ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory, - StatsProvider statsProvider) { - Preconditions.checkNotNull(executor); - Preconditions.checkNotNull(poolLock); - Preconditions.checkNotNull(connectionFactory); - Preconditions.checkNotNull(statsProvider); - - this.executor = executor; - this.poolLock = poolLock; - available = poolLock.newCondition(); - this.connectionFactory = connectionFactory; - - String cfName = Stats.normalizeName(connectionFactory.toString()); - statsProvider.makeGauge("cp_leased_connections_" + cfName, - new Supplier<Integer>() { - @Override public Integer get() { - return leasedConnections.size(); - } - }); - statsProvider.makeGauge("cp_available_connections_" + cfName, - new Supplier<Integer>() { - @Override public Integer get() { - return availableConnections.size(); - } - }); - this.connectionsCreated = - statsProvider.makeCounter("cp_created_connections_" + cfName); - this.connectionsDestroyed = - statsProvider.makeCounter("cp_destroyed_connections_" + cfName); - this.connectionsReturned = - statsProvider.makeCounter("cp_returned_connections_" + cfName); - } - - @Override - public String toString() { - return "CP-" + connectionFactory; - } - - @Override - public S get() throws ResourceExhaustedException, TimeoutException { - checkNotClosed(); - poolLock.lock(); - try { - return leaseConnection(NO_TIMEOUT); - } finally { - poolLock.unlock(); - } - } - - @Override - public S get(Amount<Long, Time> timeout) - throws ResourceExhaustedException, TimeoutException { - - checkNotClosed(); - Preconditions.checkNotNull(timeout); - if (timeout.getValue() == 0) { - return get(); - } - - try { - long start = System.nanoTime(); - long timeBudgetNs = timeout.as(Time.NANOSECONDS); - if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) { - try { - timeBudgetNs -= (System.nanoTime() - start); - return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS)); - } finally { - poolLock.unlock(); - } - } else { - throw new TimeoutException("Timed out waiting for pool lock"); - } - } catch (InterruptedException e) { - throw new TimeoutException("Interrupted waiting for pool lock"); - } - } - - private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException, - TimeoutException { - S connection = getConnection(timeout); - if (connection == null) { - throw new ResourceExhaustedException("Connection pool resources exhausted"); - } - return leaseConnection(connection); - } - - @Override - public void release(S connection) { - release(connection, false); - } - - /** - * Equivalent to releasing a Connection with isValid() == false. - * @see ObjectPool#remove(Object) - */ - @Override - public void remove(S connection) { - release(connection, true); - } - - // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create - // connection - reason about this and potentially submit release to our executor - private void release(S connection, boolean remove) { - poolLock.lock(); - try { - if (!leasedConnections.remove(connection)) { - throw new IllegalArgumentException("Connection not controlled by this connection pool: " - + connection); - } - - if (!closed && !remove && connection.isValid()) { - addConnection(connection); - connectionsReturned.incrementAndGet(); - } else { - connectionFactory.destroy(connection); - connectionsDestroyed.incrementAndGet(); - } - } finally { - poolLock.unlock(); - } - } - - @Override - public void close() { - poolLock.lock(); - try { - for (S availableConnection : availableConnections) { - connectionFactory.destroy(availableConnection); - } - } finally { - closed = true; - poolLock.unlock(); - } - } - - private void checkNotClosed() { - Preconditions.checkState(!closed); - } - - private S leaseConnection(S connection) { - leasedConnections.add(connection); - return connection; - } - - // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be - // fixed but there may be no need - do gedankanalysis - private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException, - TimeoutException { - if (availableConnections.isEmpty()) { - if (leasedConnections.isEmpty()) { - // Completely empty pool - try { - return createConnection(timeout); - } catch (Exception e) { - throw new ResourceExhaustedException("failed to create a new connection", e); - } - } else { - // If the pool is allowed to grow - let the connection factory race a release - if (connectionFactory.mightCreate()) { - executor.execute(new Runnable() { - @Override public void run() { - try { - // The connection timeout is not needed here to honor the callers get requested - // timeout, but we don't want to have an infinite timeout which could exhaust a - // thread pool over many backgrounded create calls - S connection = createConnection(timeout); - if (connection != null) { - addConnection(connection); - } else { - LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " + - "due to maximum pool size or timeout"); - } - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e); - } - } - }); - } - - try { - // We wait for a returned/new connection here in loops to guard against the - // "spurious wakeups" that are documented can occur with Condition.await() - if (timeout.getValue() == 0) { - while(availableConnections.isEmpty()) { - available.await(); - } - } else { - long timeRemainingNs = timeout.as(Time.NANOSECONDS); - while(availableConnections.isEmpty()) { - long start = System.nanoTime(); - if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) { - throw new TimeoutException( - "timeout waiting for a connection to be released to the pool"); - } else { - timeRemainingNs -= (System.nanoTime() - start); - } - } - if (availableConnections.isEmpty()) { - throw new TimeoutException( - "timeout waiting for a connection to be released to the pool"); - } - } - } catch (InterruptedException e) { - throw new TimeoutException("Interrupted while waiting for a connection."); - } - } - } - - return getAvailableConnection(); - } - - private S getAvailableConnection() { - S connection = (availableConnections.size() == 1) - ? Iterables.getOnlyElement(availableConnections) - : availableConnections.iterator().next(); - if (!availableConnections.remove(connection)) { - throw new IllegalArgumentException("Connection picked not in pool: " + connection); - } - return connection; - } - - private S createConnection(Amount<Long, Time> timeout) throws Exception { - S connection = connectionFactory.create(timeout); - if (connection != null) { - connectionsCreated.incrementAndGet(); - } - return connection; - } - - private void addConnection(S connection) { - poolLock.lock(); - try { - availableConnections.add(connection); - available.signal(); - } finally { - poolLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java deleted file mode 100644 index 6b68513..0000000 --- a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.net.pool; - -import com.google.common.collect.ImmutableSet; - -import com.twitter.common.base.Command; - -/** - * A host set that can be monitored for changes. - * - * @param <T> The type that is used to identify members of the host set. - */ -public interface DynamicHostSet<T> { - - /** - * Registers a monitor to receive change notices for this server set as long as this jvm process - * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. - * The monitor will be notified if the membership set or parameters of existing members have - * changed. - * - * @param monitor the server set monitor to call back when the host set changes - * @throws MonitorException if there is a problem monitoring the host set - * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)} - */ - @Deprecated - public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException; - - /** - * Registers a monitor to receive change notices for this server set as long as this jvm process - * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. - * The monitor will be notified if the membership set or parameters of existing members have - * changed. - * - * @param monitor the server set monitor to call back when the host set changes - * @return A command which, when executed, will stop monitoring the host set. - * @throws MonitorException if there is a problem monitoring the host set - */ - public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException; - - /** - * An interface to an object that is interested in receiving notification whenever the host set - * changes. - */ - public static interface HostChangeMonitor<T> { - - /** - * Called when either the available set of services changes (when a service dies or a new - * instance comes on-line) or when an existing service advertises a status or health change. - * - * @param hostSet the current set of available ServiceInstances - */ - void onChange(ImmutableSet<T> hostSet); - } - - public static class MonitorException extends Exception { - public MonitorException(String msg) { - super(msg); - } - - public MonitorException(String msg, Throwable cause) { - super(msg, cause); - } - } -}
