http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Stat.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/Stat.java b/commons/src/main/java/com/twitter/common/util/Stat.java new file mode 100644 index 0000000..1f430eb --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/Stat.java @@ -0,0 +1,360 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +/** ************************************************************************ + ** Summize + ** This work protected by US Copyright Law and contains proprietary and + ** confidential trade secrets. + ** (c) Copyright 2007 Summize, ALL RIGHTS RESERVED. + ** ************************************************************************/ +package com.twitter.common.util; + +//*************************************************************** +// + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.text.NumberFormat; + +/** + * This class is designed to provide basic statistics collection. + * For each instance of this object statistics and be added to it + * then the sum, mean, std dev, min and max can be gathered at the + * end. To reuse this object, a clear method can be called to reset + * the statistics. + */ +public class Stat implements Serializable { + + /** + * Add a number to the statistics collector. + * doubles are used for all collections. + * + * @param x number added to the statistics. + */ + public void addNumber(int x) { + addNumber((double) x); + } + + /** + * Add a number to the statistics collector. + * doubles are used for all collections. + * + * @param x number added to the statistics. + */ + public void addNumber(float x) { + addNumber((double) x); + } + + /** + * Add a number to the statistics collector. + * doubles are used for all collections. + * + * @param x number added to the statistics. + */ + public synchronized void addNumber(double x) { + if (_max < x) { + _max = x; + } + if (_min > x) { + _min = x; + } + + _sum += x; + _sumOfSq += (x * x); + _number++; + + return; + } + + + /** + * Clear the statistics counters... + */ + public void clear() { + _max = 0; + _min = Double.MAX_VALUE; + _number = 0; + _mean = 0; + _stdDev = 0; + _sum = 0; + _sumOfSq = 0; + } + + + /** + * Create a string representation of the + * statistics collected so far. NOTE this + * is formatted and may not suit all needs + * and thus the user should just call the + * needed methods to get mean, std dev, etc. + * and format the data as needed. + * + * @return String Java string formatted output of results. + */ + public String toString() { + return toString(false); + } + + + /** + * Create a string representation of the + * statistics collected so far. The results + * are formatted in percentage format if + * passed in true, otherwise the results + * are the same as the toString call. NOTE this + * is formatted and may not suit all needs + * and thus the user should just call the + * needed methods to get mean, std dev, etc. + * and format the data as needed. + * + * @param percent Format as percentages if set to true. + * @return String Java string formatted output of results. + */ + public String toString(boolean percent) { + calculate(); + NumberFormat nf = NumberFormat.getInstance(); + nf.setMaximumFractionDigits(4); + + if (_number > 1) { + StringBuffer results = new StringBuffer(); + if (percent) { + results.append("Number:" + nf.format(_number * 100) + "%"); + } else { + results.append("Number:" + nf.format(_number)); + } + + if (percent) { + results.append(" Max:" + nf.format(_max * 100) + "%"); + } else { + results.append(" Max:" + nf.format(_max)); + } + + if (percent) { + results.append(" Min:" + nf.format(_min * 100) + "%"); + } else { + results.append(" Min:" + nf.format(_min)); + } + + if (percent) { + results.append(" Mean:" + nf.format(_mean * 100) + "%"); + } else { + results.append(" Mean:" + nf.format(_mean)); + } + + results.append(" Sum:" + nf.format(_sum)); + results.append(" STD:" + nf.format(_stdDev)); + return results.toString(); + } else if (_number == 1) { + if (percent) { + return ("Number:" + nf.format(_sum * 100) + "%"); + } else { + return ("Number:" + nf.format(_sum)); + } + } else { + return ("Number: N/A"); + } + } + + + private void calculate() { + getMean(); + getStandardDev(); + } + + + /** + * Get the max data element added to the statistics + * object so far. + * + * @return double - Maximum entry added so far. + */ + public double getMax() { + return _max; + } + + + /** + * Get the min data element added to the statistics + * object so far. + * + * @return double - Min entry added so far. + */ + public double getMin() { + return _min; + } + + + /** + * Get the number of data elements added to the statistics + * object so far. + * + * @return double - Number of entries added so far. + */ + public long getNumberOfElements() { + return _number; + } + + + /** + * Get the average or mean of data elements added to the + * statistics object so far. + * + * @return double - Mean of entries added so far. + */ + public double getMean() { + if (_number > 0) { + _mean = _sum / _number; + } + return _mean; + } + + /** + * Get the ratio of the sum of elements divided by the number + * of elements added * 100 + * + * @return double - Percent of entries added so far. + */ + public double getPercent() { + if (_number > 0) { + _mean = _sum / _number; + } + _mean = _mean * 100; + return _mean; + } + + + /** + * Get the sum or mean of data elements added to the + * statistics object so far. + * + * @return double - Sum of entries added so far. + */ + public double getSum() { + return _sum; + } + + + /** + * Get the sum of the squares of the data elements added + * to the statistics object so far. + * + * @return double - Sum of the squares of the entries added so far. + */ + public double getSumOfSq() { + return _sumOfSq; + } + + + /** + * Get the standard deviation of the data elements added + * to the statistics object so far. + * + * @return double - Sum of the standard deviation of the entries added so far. + */ + public double getStandardDev() { + if (_number > 1) { + _stdDev = Math.sqrt((_sumOfSq - ((_sum * _sum) / _number)) / (_number - 1)); + } + return _stdDev; + } + + + /** + * Read the data from the InputStream so it can be used to populate + * the current objects state. + * + * @param in java.io.InputStream to write to. + * @throws IOException + */ + public void readFromDataInput(InputStream in) throws IOException { + DataInput di = new DataInputStream(in); + readFromDataInput(di); + return; + } + + + /** + * Read the data from the DataInput so it can be used to populate + * the current objects state. + * + * @param in java.io.InputStream to write to. + * @throws IOException + */ + public void readFromDataInput(DataInput in) throws IOException { + _max = in.readDouble(); + _min = in.readDouble(); + _number = in.readLong(); + _mean = in.readDouble(); + _stdDev = in.readDouble(); + _sum = in.readDouble(); + _sumOfSq = in.readDouble(); + return; + } + + + /** + * Write the data to the output steam so it can be streamed to an + * other process, wire or storage median in a format that another Stats + * object can read. + * + * @param out java.io.OutputStream to write to. + * @throws IOException + */ + public void writeToDataOutput(OutputStream out) throws IOException { + DataOutput dout = new DataOutputStream(out); + writeToDataOutput(dout); + return; + + } + + + /** + * Write the data to the data output object so it can be written to an + * other process, wire or storage median in a format that another Stats + * object can read. + * + * @param out java.io.DataOutput to write to. + * @throws IOException + */ + public void writeToDataOutput(DataOutput out) throws IOException { + out.writeDouble(_max); + out.writeDouble(_min); + out.writeLong(_number); + out.writeDouble(_mean); + out.writeDouble(_stdDev); + out.writeDouble(_sum); + out.writeDouble(_sumOfSq); + return; + } + + + // ************************************ + private static final long serialVersionUID = 1L; + private double _max = 0 ; + private double _min = Double.MAX_VALUE ; + private long _number = 0 ; + private double _mean = 0 ; + private double _stdDev = 0 ; + private double _sum = 0 ; + private double _sumOfSq ; +} +
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/StateMachine.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/StateMachine.java b/commons/src/main/java/com/twitter/common/util/StateMachine.java new file mode 100644 index 0000000..4a559b5 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/StateMachine.java @@ -0,0 +1,586 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +import org.apache.commons.lang.builder.HashCodeBuilder; + +import com.twitter.common.base.Closure; +import com.twitter.common.base.Closures; +import com.twitter.common.base.ExceptionalSupplier; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.twitter.common.base.MorePreconditions.checkNotBlank; + +/** + * Represents a state machine that is not necessarily a Finite State Machine. + * The caller may configure the state machine to permit only known state transitions, or to only + * disallow known state transitions (and permit unknown transitions). + * + * @param <T> THe type of objects that the caller uses to represent states. + * + * TODO(William Farner): Consider merging the stats-tracking ala PipelineStats into this. + */ +public class StateMachine<T> { + private static final Logger LOG = Logger.getLogger(StateMachine.class.getName()); + + private final String name; + + // Stores mapping from states to the states that the machine is allowed to transition into. + private final Multimap<T, T> stateTransitions; + + private final Closure<Transition<T>> transitionCallback; + private final boolean throwOnBadTransition; + + private volatile T currentState; + private final Lock readLock; + private final Lock writeLock; + + + private StateMachine(String name, + T initialState, + Multimap<T, T> stateTransitions, + Closure<Transition<T>> transitionCallback, + boolean throwOnBadTransition) { + this.name = name; + this.currentState = initialState; + this.stateTransitions = stateTransitions; + this.transitionCallback = transitionCallback; + this.throwOnBadTransition = throwOnBadTransition; + + ReadWriteLock stateLock = new ReentrantReadWriteLock(true /* fair */); + readLock = stateLock.readLock(); + writeLock = stateLock.writeLock(); + } + + /** + * Gets the name of this state machine. + * + * @return The state machine name. + */ + public String getName() { + return name; + } + + /** + * Fetches the state that the machine is currently in. + * + * @return Current state. + */ + public T getState() { + return currentState; + } + + /** + * Checks that the current state is the {@code expectedState} and throws if it is not. + * + * @param expectedState The expected state + * @throws IllegalStateException if the current state is not the {@code expectedState}. + */ + public void checkState(T expectedState) { + checkState(ImmutableSet.of(expectedState)); + } + + /** + * Checks that the current state is one of the {@code allowedStates} and throws if it is not. + * + * @param allowedStates The allowed states. + * @throws IllegalStateException if the current state is not the {@code expectedState}. + */ + public void checkState(Set<T> allowedStates) { + checkNotNull(allowedStates); + checkArgument(!allowedStates.isEmpty(), "At least one possible state must be provided."); + + readLock.lock(); + try { + if (!allowedStates.contains(currentState)) { + throw new IllegalStateException( + String.format("In state %s, expected to be in %s.", currentState, allowedStates)); + } + } finally { + readLock.unlock(); + } + } + + /** + * Executes the supplied {@code work} if the state machine is in the {@code expectedState}, + * postponing any concurrently requested {@link #transition(Object)} until after the execution of + * the work. + * + * @param expectedState The expected state the work should be performed in. + * @param work The work to perform in the {@code expectedState}. + * @param <O> The type returned by the unit of work. + * @param <E> The type of exception that may be thrown by the unit of work. + * @return The result of the unit of work if the current state is the {@code expectedState}. + * @throws IllegalStateException if the current state is not the {@code expectedState}. + * @throws E if the unit of work throws. + */ + public <O, E extends Exception> O doInState(T expectedState, ExceptionalSupplier<O, E> work) + throws E { + + checkNotNull(expectedState); + checkNotNull(work); + + readLock.lock(); + try { + checkState(expectedState); + return work.get(); + } finally { + readLock.unlock(); + } + } + + /** + * Transitions the machine into state {@code nextState}. + * + * @param nextState The state to move into. + * @throws IllegalStateTransitionException If the state transition is not allowed. + * @return {@code true} if the transition was allowed, {@code false} otherwise. + */ + public boolean transition(T nextState) throws IllegalStateTransitionException { + boolean transitionAllowed = false; + + T currentCopy = currentState; + + writeLock.lock(); + try { + if (stateTransitions.containsEntry(currentState, nextState)) { + currentState = nextState; + transitionAllowed = true; + } else if (throwOnBadTransition) { + throw new IllegalStateTransitionException( + String.format("State transition from %s to %s is not allowed.", currentState, + nextState)); + } + } finally { + writeLock.unlock(); + } + + transitionCallback.execute(new Transition<T>(currentCopy, nextState, transitionAllowed)); + return transitionAllowed; + } + + public static class IllegalStateTransitionException extends IllegalStateException { + public IllegalStateTransitionException(String msg) { + super(msg); + } + } + + /** + * Convenience method to create a builder object. + * + * @param <T> Type of builder to create. + * @param name Name of the state machine to create a builder for. + * @return New builder. + */ + public static <T> Builder<T> builder(String name) { + return new Builder<T>(name); + } + + /** + * A state and its allowed transitions (if any) and (optional) callback. + * + * @param <T> State type. + */ + public static class Rule<T> { + private final T from; + private final Set<T> to; + private final Closure<Transition<T>> callback; + + private Rule(T from) { + this(from, ImmutableSet.<T>of()); + } + + private Rule(T from, Set<T> to) { + this(from, to, Closures.<Transition<T>>noop()); + } + + private Rule(T from, Set<T> to, Closure<Transition<T>> callback) { + this.from = checkNotNull(from); + this.to = checkNotNull(to); + this.callback = checkNotNull(callback); + } + + /** + * Associates a callback to be triggered after any attempt to transition from this state is + * made. + * + * @param callback Callback to signal. + * @return A new rule that is identical to this rule, but with the provided + * callback + */ + public Rule<T> withCallback(Closure<Transition<T>> callback) { + return new Rule<T>(from, to, callback); + } + + /** + * A helper class when building a transition rule, to define the allowed transitions. + * + * @param <T> State type. + */ + public static class AllowedTransition<T> { + private final Rule<T> rule; + + private AllowedTransition(Rule<T> rule) { + this.rule = rule; + } + + /** + * Associates a single allowed transition with this state. + * + * @param state Allowed transition state. + * @return A new rule that identical to the original, but only allowing a transition to the + * provided state. + */ + public Rule<T> to(T state) { + return new Rule<T>(rule.from, ImmutableSet.<T>of(state), rule.callback); + } + + /** + * Associates multiple transitions with this state. + * + * @param state An allowed transition state. + * @param additionalStates Additional states that may be transitioned to. + * @return A new rule that identical to the original, but only allowing a transition to the + * provided states. + */ + public Rule<T> to(T state, T... additionalStates) { + return new Rule<T>(rule.from, ImmutableSet.copyOf(Lists.asList(state, additionalStates))); + } + + /** + * Allows no transitions to be performed from this state. + * + * @return The original rule. + */ + public Rule<T> noTransitions() { + return rule; + } + } + + /** + * Creates a new transition rule. + * + * @param state State to create and associate transitions with. + * @param <T> State type. + * @return A new transition rule builder. + */ + public static <T> AllowedTransition<T> from(T state) { + return new AllowedTransition<T>(new Rule<T>(state)); + } + } + + /** + * Builder to create a state machine. + * + * @param <T> + */ + public static class Builder<T> { + private final String name; + private T initialState; + private final Multimap<T, T> stateTransitions = HashMultimap.create(); + private final List<Closure<Transition<T>>> transitionCallbacks = Lists.newArrayList(); + private boolean throwOnBadTransition = true; + + public Builder(String name) { + this.name = checkNotBlank(name); + } + + /** + * Sets the initial state for the state machine. + * + * @param state Initial state. + * @return A reference to the builder. + */ + public Builder<T> initialState(T state) { + checkNotNull(state); + initialState = state; + return this; + } + + /** + * Adds a state and its allowed transitions. + * + * @param rule The state and transition rule to add. + * @return A reference to the builder. + */ + public Builder<T> addState(Rule<T> rule) { + return addState(rule.callback, rule.from, rule.to); + } + + /** + * Adds a state and its allowed transitions. + * At least one transition state must be added, it is not necessary to explicitly add states + * that have no allowed transitions (terminal states). + * + * @param callback Callback to notify of any transition attempted from the state. + * @param state State to add. + * @param transitionStates Allowed transitions from {@code state}. + * @return A reference to the builder. + */ + public Builder<T> addState(Closure<Transition<T>> callback, T state, + Set<T> transitionStates) { + checkNotNull(callback); + checkNotNull(state); + + Preconditions.checkArgument(Iterables.all(transitionStates, Predicates.notNull())); + + stateTransitions.putAll(state, transitionStates); + + @SuppressWarnings("unchecked") + Predicate<Transition<T>> filter = Transition.from(state); + onTransition(filter, callback); + return this; + } + + /** + * Varargs version of {@link #addState(com.twitter.common.base.Closure, Object, java.util.Set)}. + * + * @param callback Callback to notify of any transition attempted from the state. + * @param state State to add. + * @param transitionStates Allowed transitions from {@code state}. + * @return A reference to the builder. + */ + public Builder<T> addState(Closure<Transition<T>> callback, T state, + T... transitionStates) { + Set<T> states = ImmutableSet.copyOf(transitionStates); + Preconditions.checkArgument(Iterables.all(states, Predicates.notNull())); + + return addState(callback, state, states); + } + + /** + * Adds a state and its allowed transitions. + * At least one transition state must be added, it is not necessary to explicitly add states + * that have no allowed transitions (terminal states). + * + * @param state State to add. + * @param transitionStates Allowed transitions from {@code state}. + * @return A reference to the builder. + */ + public Builder<T> addState(T state, T... transitionStates) { + return addState(Closures.<Transition<T>>noop(), state, transitionStates); + } + + private void onTransition(Predicate<Transition<T>> transitionFilter, + Closure<Transition<T>> handler) { + onAnyTransition(Closures.filter(transitionFilter, handler)); + } + + /** + * Adds a callback to be executed for every state transition, including invalid transitions + * that are attempted. + * + * @param handler Callback to notify of transition attempts. + * @return A reference to the builder. + */ + public Builder<T> onAnyTransition(Closure<Transition<T>> handler) { + transitionCallbacks.add(handler); + return this; + } + + /** + * Adds a log message for every state transition that is attempted. + * + * @return A reference to the builder. + */ + public Builder<T> logTransitions() { + return onAnyTransition(new Closure<Transition<T>>() { + @Override public void execute(Transition<T> transition) { + LOG.info(name + " state machine transition " + transition); + } + }); + } + + /** + * Allows the caller to specify whether {@link IllegalStateTransitionException} should be thrown + * when a bad state transition is attempted (the default behavior). + * + * @param throwOnBadTransition Whether an exception should be thrown when a bad state transition + * is attempted. + * @return A reference to the builder. + */ + public Builder<T> throwOnBadTransition(boolean throwOnBadTransition) { + this.throwOnBadTransition = throwOnBadTransition; + return this; + } + + /** + * Builds the state machine. + * + * @return A reference to the prepared state machine. + */ + public StateMachine<T> build() { + Preconditions.checkState(initialState != null, "Initial state must be specified."); + checkArgument(!stateTransitions.isEmpty(), "No state transitions were specified."); + return new StateMachine<T>(name, + initialState, + stateTransitions, + Closures.combine(transitionCallbacks), + throwOnBadTransition); + } + } + + /** + * Representation of a state transition. + * + * @param <T> State type. + */ + public static class Transition<T> { + private final T from; + private final T to; + private final boolean allowed; + + public Transition(T from, T to, boolean allowed) { + this.from = checkNotNull(from); + this.to = checkNotNull(to); + this.allowed = allowed; + } + + private static <T> Function<Transition<T>, T> from() { + return new Function<Transition<T>, T>() { + @Override public T apply(Transition<T> transition) { + return transition.from; + } + }; + } + + private static <T> Function<Transition<T>, T> to() { + return new Function<Transition<T>, T>() { + @Override public T apply(Transition<T> transition) { + return transition.to; + } + }; + } + + private static <T> Predicate<Transition<T>> oneSideFilter( + Function<Transition<T>, T> extractor, final T... states) { + checkArgument(Iterables.all(Arrays.asList(states), Predicates.notNull())); + + return Predicates.compose(Predicates.in(ImmutableSet.copyOf(states)), extractor); + } + + /** + * Creates a predicate that returns {@code true} for transitions from the given states. + * + * @param states States to filter on. + * @param <T> State type. + * @return A from-state filter. + */ + public static <T> Predicate<Transition<T>> from(final T... states) { + return oneSideFilter(Transition.<T>from(), states); + } + + /** + * Creates a predicate that returns {@code true} for transitions to the given states. + * + * @param states States to filter on. + * @param <T> State type. + * @return A to-state filter. + */ + public static <T> Predicate<Transition<T>> to(final T... states) { + return oneSideFilter(Transition.<T>to(), states); + } + + /** + * Creates a predicate that returns {@code true} for a specific state transition. + * + * @param from From state. + * @param to To state. + * @param <T> State type. + * @return A state transition filter. + */ + public static <T> Predicate<Transition<T>> transition(final T from, final T to) { + @SuppressWarnings("unchecked") + Predicate<Transition<T>> fromFilter = from(from); + @SuppressWarnings("unchecked") + Predicate<Transition<T>> toFilter = to(to); + return Predicates.and(fromFilter, toFilter); + } + + public T getFrom() { + return from; + } + + public T getTo() { + return to; + } + + public boolean isAllowed() { + return allowed; + } + + /** + * Checks whether this transition represents a state change, which means that the 'to' state is + * not equal to the 'from' state, and the transition is allowed. + * + * @return {@code true} if the state was changed, {@code false} otherwise. + */ + public boolean isValidStateChange() { + return isAllowed() && !from.equals(to); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Transition)) { + return false; + } + + if (o == this) { + return true; + } + + Transition<?> other = (Transition) o; + return from.equals(other.from) && to.equals(other.to); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(from) + .append(to) + .toHashCode(); + } + + @Override + public String toString() { + String str = from.toString() + " -> " + to.toString(); + if (!isAllowed()) { + str += " (not allowed)"; + } + return str; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Timer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/Timer.java b/commons/src/main/java/com/twitter/common/util/Timer.java new file mode 100644 index 0000000..d944f03 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/Timer.java @@ -0,0 +1,74 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.twitter.common.base.Commands; +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.base.ExceptionalSupplier; +import com.twitter.common.stats.SlidingStats; + +/** + * A utility for timing blocks of code. + * + * <p>TODO(John Sirois): consider instead: + * <T, E extends Exception> Pair<T, Long> doTimed(ExceptionalSupplier<T, E> timedWork) throws E + * or a subinterface of Command/Closure/Supplier/Function that exposes a timing method as other ways + * to factor in timing. + * + * @author John Sirois + */ +public final class Timer { + + /** + * Times the block of code encapsulated by {@code timedWork} recoding the result in {@code stat}. + * + * @param stat the stat to record the timing with + * @param timedWork the code to time + * @param <E> the type of exception {@code timedWork} may throw + * @throws E if {@code timedWork} throws + */ + public static <E extends Exception> void doTimed(SlidingStats stat, + final ExceptionalCommand<E> timedWork) throws E { + doTimed(stat, Commands.asSupplier(timedWork)); + } + + /** + * Times the block of code encapsulated by {@code timedWork} recoding the result in {@code stat}. + * + * @param stat the stat to record the timing with + * @param timedWork the code to time + * @param <T> the type of result {@code timedWork} returns + * @param <E> the type of exception {@code timedWork} may throw + * @return the result of {@code timedWork} if it completes normally + * @throws E if {@code timedWork} throws + */ + public static <T, E extends Exception> T doTimed(SlidingStats stat, + ExceptionalSupplier<T, E> timedWork) throws E { + StartWatch timer = new StartWatch(); + timer.start(); + try { + return timedWork.get(); + } finally { + timer.stop(); + stat.accumulate(timer.getTime()); + } + } + + private Timer() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java b/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java new file mode 100644 index 0000000..779e0be --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java @@ -0,0 +1,77 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.base.Preconditions; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * A BackoffStrategy that implements truncated binary exponential backoff. + */ +public class TruncatedBinaryBackoff implements BackoffStrategy { + private final long initialBackoffMs; + private final long maxBackoffIntervalMs; + private final boolean stopAtMax; + + /** + * Creates a new TruncatedBinaryBackoff that will start by backing off for {@code initialBackoff} + * and then backoff of twice as long each time its called until reaching the {@code maxBackoff} at + * which point shouldContinue() will return false and any future backoffs will always wait for + * that amount of time. + * + * @param initialBackoff the intial amount of time to backoff + * @param maxBackoff the maximum amount of time to backoff + * @param stopAtMax whether shouldContinue() returns false when the max is reached + */ + public TruncatedBinaryBackoff(Amount<Long, Time> initialBackoff, + Amount<Long, Time> maxBackoff, boolean stopAtMax) { + Preconditions.checkNotNull(initialBackoff); + Preconditions.checkNotNull(maxBackoff); + Preconditions.checkArgument(initialBackoff.getValue() > 0); + Preconditions.checkArgument(maxBackoff.compareTo(initialBackoff) >= 0); + initialBackoffMs = initialBackoff.as(Time.MILLISECONDS); + maxBackoffIntervalMs = maxBackoff.as(Time.MILLISECONDS); + this.stopAtMax = stopAtMax; + } + + /** + * Same as main constructor, but this will always return true from shouldContinue(). + * + * @param initialBackoff the intial amount of time to backoff + * @param maxBackoff the maximum amount of time to backoff + */ + public TruncatedBinaryBackoff(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff) { + this(initialBackoff, maxBackoff, false); + } + + @Override + public long calculateBackoffMs(long lastBackoffMs) { + Preconditions.checkArgument(lastBackoffMs >= 0); + long backoff = (lastBackoffMs == 0) ? initialBackoffMs + : Math.min(maxBackoffIntervalMs, lastBackoffMs * 2); + return backoff; + } + + @Override + public boolean shouldContinue(long lastBackoffMs) { + Preconditions.checkArgument(lastBackoffMs >= 0); + boolean stop = stopAtMax && (lastBackoffMs >= maxBackoffIntervalMs); + + return !stop; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/caching/Cache.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/caching/Cache.java b/commons/src/main/java/com/twitter/common/util/caching/Cache.java new file mode 100644 index 0000000..d37e601 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/caching/Cache.java @@ -0,0 +1,49 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.caching; + +/** + * Definition of basic caching functionality. Cache keys and values are expected to always be + * valid, non-null values. + * + * @author William Farner + */ +public interface Cache<K, V> { + + /** + * Fetches a value from the cache. + * + * @param key The key for the value to fetch, must not be {@code null}. + * @return The cached value corresponding with {@code key}, or {@code null} if no entry exists. + */ + public V get(K key); + + /** + * Stores a key-value pair in the cache. + * + * @param key The key to store, must not be {@code null}. + * @param value The value to store, must not be {@code null}. + */ + public void put(K key, V value); + + /** + * Deletes an entry from the cache. + * + * @param key Key for the value to delete, must not be {@code null}. + */ + public void delete(K key); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java b/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java new file mode 100644 index 0000000..2a07a6e --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java @@ -0,0 +1,265 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.caching; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * A proxy class that handles caching of return values for method calls to a wrapped object. + * + * Example usage: + * + * Foo uncached = new Foo(); + * CachingMethodProxy<Foo> methodProxy = CachingMethodProxy.proxyFor(uncached, Foo.class); + * Foo foo = methodProxy.getCachingProxy(); + * methodProxy.cache(foo.doBar(), lruCache1) + * .cache(foo.doBaz(), lruCache2) + * .prepare(); + * + * @author William Farner + */ +public class CachingMethodProxy<T> { + + // Dummy return values to return when in recording state. + private static final Map<Class<?>, Object> EMPTY_RETURN_VALUES = + ImmutableMap.<Class<?>, Object>builder() + .put(Boolean.TYPE, Boolean.FALSE) + .put(Byte.TYPE, Byte.valueOf((byte) 0)) + .put(Short.TYPE, Short.valueOf((short) 0)) + .put(Character.TYPE, Character.valueOf((char)0)) + .put(Integer.TYPE, Integer.valueOf(0)) + .put(Long.TYPE, Long.valueOf(0)) + .put(Float.TYPE, Float.valueOf(0)) + .put(Double.TYPE, Double.valueOf(0)) + .build(); + private static final Map<Class<?>, Class<?>> AUTO_BOXING_MAP = + ImmutableMap.<Class<?>, Class<?>>builder() + .put(Boolean.TYPE, Boolean.class) + .put(Byte.TYPE, Byte.class) + .put(Short.TYPE, Short.class) + .put(Character.TYPE, Character.class) + .put(Integer.TYPE, Integer.class) + .put(Long.TYPE, Long.class) + .put(Float.TYPE, Float.class) + .put(Double.TYPE, Double.class) + .build(); + + // The uncached resource, whose method calls are deemed to be expensive and cacheable. + private final T uncached; + + // The methods that are cached, and the caches themselves. + private final Map<Method, MethodCache> methodCaches = Maps.newHashMap(); + private final Class<T> type; + + private Method lastMethodCall = null; + private boolean recordMode = true; + + /** + * Creates a new caching method proxy that will wrap an object and cache for the provided methods. + * + * @param uncached The uncached object that will be reverted to when a cache entry is not present. + */ + private CachingMethodProxy(T uncached, Class<T> type) { + this.uncached = Preconditions.checkNotNull(uncached); + this.type = Preconditions.checkNotNull(type); + Preconditions.checkArgument(type.isInterface(), "The proxied type must be an interface."); + } + + private static Object invokeMethod(Object subject, Method method, Object[] args) + throws Throwable { + try { + return method.invoke(subject, args); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot access " + subject.getClass() + "." + method, e); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + + /** + * A cached method and its caching control structures. + * + * @param <K> Cache key type. + * @param <V> Cache value type, expected to match the return type of the method. + */ + private static class MethodCache<K, V> { + private final Method method; + private final Cache<K, V> cache; + private final Function<Object[], K> keyBuilder; + private final Predicate<V> entryFilter; + + MethodCache(Method method, Cache<K, V> cache, Function<Object[], K> keyBuilder, + Predicate<V> entryFilter) { + this.method = method; + this.cache = cache; + this.keyBuilder = keyBuilder; + this.entryFilter = entryFilter; + } + + V doInvoke(Object uncached, Object[] args) throws Throwable { + K key = keyBuilder.apply(args); + + V cachedValue = cache.get(key); + + if (cachedValue != null) return cachedValue; + + Object fetched = invokeMethod(uncached, method, args); + + if (fetched == null) return null; + + @SuppressWarnings("unchecked") + V typedValue = (V) fetched; + + if (entryFilter.apply(typedValue)) cache.put(key, typedValue); + + return typedValue; + } + } + + /** + * Creates a new builder for the given type. + * + * @param uncached The uncached object that should be insulated by caching. + * @param type The interface that a proxy should be created for. + * @param <T> Type parameter to the proxied class. + * @return A new builder. + */ + public static <T> CachingMethodProxy<T> proxyFor(T uncached, Class<T> type) { + return new CachingMethodProxy<T>(uncached, type); + } + + @SuppressWarnings("unchecked") + public T getCachingProxy() { + return (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[] { type }, + new InvocationHandler() { + @Override public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return doInvoke(method, args); + } + }); + } + + private Object doInvoke(Method method, Object[] args) throws Throwable { + return recordMode ? recordCall(method) : cacheRequest(method, args); + } + + private Object recordCall(Method method) { + Preconditions.checkArgument(method.getReturnType() != Void.TYPE, + "Void return methods cannot be cached: " + method); + Preconditions.checkArgument(method.getParameterTypes().length > 0, + "Methods with zero arguments cannot be cached: " + method); + Preconditions.checkState(lastMethodCall == null, + "No cache instructions provided for call to: " + lastMethodCall); + + lastMethodCall = method; + + Class<?> returnType = method.getReturnType(); + return returnType.isPrimitive() ? EMPTY_RETURN_VALUES.get(returnType) : null; + } + + private Object cacheRequest(Method method, Object[] args) throws Throwable { + MethodCache cache = methodCaches.get(method); + + // Check if we are caching for this method. + if (cache == null) return invokeMethod(uncached, method, args); + + return cache.doInvoke(uncached, args); + } + + /** + * Instructs the proxy that cache setup is complete, and the proxy instance should begin caching + * and delegating uncached calls. After this is called, any subsequent calls to any of the + * cache setup methods will result in an {@link IllegalStateException}. + */ + public void prepare() { + Preconditions.checkState(!methodCaches.isEmpty(), "At least one method must be cached."); + Preconditions.checkState(recordMode, "prepare() may only be invoked once."); + + recordMode = false; + } + + public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache) { + return cache(value, cache, Predicates.<V>alwaysTrue()); + } + + public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache, + Predicate<V> valueFilter) { + return cache(value, cache, DEFAULT_KEY_BUILDER, valueFilter); + } + + public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache, + Function<Object[], K> keyBuilder) { + // Get the last method call and declare it the cached method. + return cache(value, cache, keyBuilder, Predicates.<V>alwaysTrue()); + } + + public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache, + Function<Object[], K> keyBuilder, Predicate<V> valueFilter) { + Preconditions.checkNotNull(cache); + Preconditions.checkNotNull(keyBuilder); + Preconditions.checkNotNull(valueFilter); + + Preconditions.checkState(recordMode, "Cache setup is not allowed after prepare() is called."); + + // Get the last method call and declare it the cached method. + Preconditions.checkState(lastMethodCall != null, "No method call captured to be cached."); + + Class<?> returnType = lastMethodCall.getReturnType(); + + Preconditions.checkArgument(returnType != Void.TYPE, + "Cannot cache results from void method: " + lastMethodCall); + + if (returnType.isPrimitive()) { + // If a primitive type is returned, we need to make sure that the cache holds the boxed + // type for the primitive. + returnType = AUTO_BOXING_MAP.get(returnType); + } + + // TODO(William Farner): Figure out a simple way to make this possible. Right now, since the proxy + // objects return null, we get a null here and can't check the type. + //Preconditions.checkArgument(value.getClass() == returnType, + // String.format("Cache value type '%s' does not match method return type '%s'", + // value.getClass(), lastMethodCall.getReturnType())); + + methodCaches.put(lastMethodCall, new MethodCache<K, V>(lastMethodCall, cache, keyBuilder, + valueFilter)); + + lastMethodCall = null; + + return this; + } + + private static final Function<Object[], List> DEFAULT_KEY_BUILDER = + new Function<Object[], List>() { + @Override public List apply(Object[] args) { + return Arrays.asList(args); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java b/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java new file mode 100644 index 0000000..f84bec3 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java @@ -0,0 +1,173 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.caching; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.twitter.common.base.Closure; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.collections.Pair; +import com.twitter.common.stats.Stats; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A cache with a fixed maximum size, evicting items that were used least-recently. + * WARNING: This is not thread-safe. If you wish to get a thread-safe version of a constructed + * LRUCache, you must wrap it with {@link Collections#synchronizedMap(java.util.Map)}. + * + * @author William Farner + */ +public class LRUCache<K, V> implements Cache<K, V> { + + private Map<K, V> map; + + private final AtomicLong accesses; + private final AtomicLong misses; + + /** + * Creates a new bounded cache with the given load factor. + * + * @param name Unique name for this cache. + * @param maxCapacity Maximum capacity for the cache, after which items will be evicted. + * @param loadFactor Load factor for the cache. + * @param makeSynchronized Whether the underlying map should be synchronized. + * @param evictionListener Listener to be notified when an element is evicted, or {@code null} if + * eviction notifications are not needed. + */ + private LRUCache(final String name, final int maxCapacity, float loadFactor, + boolean makeSynchronized, final Closure<Pair<K, V>> evictionListener) { + map = new LinkedHashMap<K, V>(maxCapacity, loadFactor, true /* Access order. */) { + @Override public boolean removeEldestEntry(Map.Entry<K, V> entry) { + boolean evict = size() > maxCapacity; + if (evict && evictionListener != null) { + evictionListener.execute(Pair.of(entry.getKey(), entry.getValue())); + } + return evict; + } + }; + + if (makeSynchronized) { + map = Collections.synchronizedMap(map); + } + + accesses = Stats.exportLong(name + "_lru_cache_accesses"); + misses = Stats.exportLong(name + "_lru_cache_misses"); + } + + public static <K, V> Builder<K, V> builder() { + return new Builder<K, V>(); + } + + public static class Builder<K, V> { + private String name = null; + + private int maxSize = 1000; + + // Sadly, LinkedHashMap doesn't expose this, so the default is pulled from the javadoc. + private float loadFactor = 0.75F; + + private boolean makeSynchronized = true; + + private Closure<Pair<K, V>> evictionListener = null; + + public Builder<K, V> name(String name) { + this.name = MorePreconditions.checkNotBlank(name); + return this; + } + + public Builder<K, V> maxSize(int maxSize) { + Preconditions.checkArgument(maxSize > 0); + this.maxSize = maxSize; + return this; + } + + public Builder<K, V> loadFactor(float loadFactor) { + this.loadFactor = loadFactor; + return this; + } + + public Builder<K, V> makeSynchronized(boolean makeSynchronized) { + this.makeSynchronized = makeSynchronized; + return this; + } + + public Builder<K, V> evictionListener(Closure<Pair<K, V>> evictionListener) { + this.evictionListener = evictionListener; + return this; + } + + public LRUCache<K, V> build() { + return new LRUCache<K, V>(name, maxSize, loadFactor, makeSynchronized, evictionListener); + } + } + + @Override + public V get(K key) { + accesses.incrementAndGet(); + V value = map.get(key); + if (value == null) { + misses.incrementAndGet(); + } + return value; + } + + @Override + public void put(K key, V value) { + map.put(key, value); + } + + @Override + public void delete(K key) { + map.remove(key); + } + + public int size() { + return map.size(); + } + + @Override + public String toString() { + return String.format("size: %d, accesses: %s, misses: %s", + map.size(), + accesses, + misses); + } + + public Collection<V> copyValues() { + synchronized(map) { + return ImmutableList.copyOf(map.values()); + } + } + + public long getAccesses() { + return accesses.longValue(); + } + + public long getMisses() { + return misses.longValue(); + } + + public double getHitRate() { + double numAccesses = accesses.longValue(); + return numAccesses == 0 ? 0 : (numAccesses - misses.longValue()) / numAccesses; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java b/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java new file mode 100644 index 0000000..9793276 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java @@ -0,0 +1,59 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.concurrent; + +import com.google.common.base.Preconditions; +import com.twitter.common.util.BackoffStrategy; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A {@link RetryingFutureTask} that will resubmit itself to a work queue with a backoff. + * + * @author William Farner + */ +public class BackingOffFutureTask extends RetryingFutureTask { + private final ScheduledExecutorService executor; + private final BackoffStrategy backoffStrategy; + private long backoffMs = 0; + + /** + * Creates a new retrying future task that will execute a unit of work until successfully + * completed, or the retry limit has been reached. + * + * @param executor The executor service to resubmit the task to upon failure. + * @param callable The unit of work. The work is considered successful when {@code true} is + * returned. It may return {@code false} or throw an exception when + * unsueccessful. + * @param maxRetries The maximum number of times to retry the task. + * @param backoffStrategy Strategy to use for determining backoff duration. + */ + public BackingOffFutureTask(ScheduledExecutorService executor, Callable<Boolean> callable, + int maxRetries, BackoffStrategy backoffStrategy) { + super(executor, callable, maxRetries); + this.executor = executor; + this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy); + } + + @Override + protected void retry() { + backoffMs = backoffStrategy.calculateBackoffMs(backoffMs); + executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java new file mode 100644 index 0000000..056282c --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java @@ -0,0 +1,81 @@ +package com.twitter.common.util.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; + +/** + * An executor service that delegates to another executor service, invoking an uncaught + * exception handler if any exceptions are thrown in submitted work. + * + * @see MoreExecutors + */ +class ExceptionHandlingExecutorService extends ForwardingExecutorService<ExecutorService> { + private final Supplier<Thread.UncaughtExceptionHandler> handler; + + ExceptionHandlingExecutorService( + ExecutorService delegate, + Supplier<Thread.UncaughtExceptionHandler> handler) { + + super(delegate); + this.handler = Preconditions.checkNotNull(handler); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return super.submit(TaskConverter.alertingCallable(task, handler)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return super.submit(TaskConverter.alertingRunnable(task, handler), result); + } + + @Override + public Future<?> submit(Runnable task) { + return super.submit(TaskConverter.alertingRunnable(task, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks) throws InterruptedException { + + return super.invokeAll(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) throws InterruptedException { + + return super.invokeAll(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } + + @Override + public <T> T invokeAny( + Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + + return super.invokeAny(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> T invokeAny( + Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + return super.invokeAny(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java new file mode 100644 index 0000000..2cd807d --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java @@ -0,0 +1,108 @@ +package com.twitter.common.util.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; + +/** + * A scheduled executor service that delegates to another executor service, invoking an uncaught + * exception handler if any exceptions are thrown in submitted work. + * + * @see MoreExecutors + */ +class ExceptionHandlingScheduledExecutorService + extends ForwardingExecutorService<ScheduledExecutorService> + implements ScheduledExecutorService { + private final Supplier<Thread.UncaughtExceptionHandler> handler; + + /** + * Construct a {@link ScheduledExecutorService} with a supplier of + * {@link Thread.UncaughtExceptionHandler} that handles exceptions thrown from submitted work. + */ + ExceptionHandlingScheduledExecutorService( + ScheduledExecutorService delegate, + Supplier<Thread.UncaughtExceptionHandler> handler) { + super(delegate); + this.handler = handler; + } + + @Override + public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) { + return delegate.schedule(TaskConverter.alertingRunnable(runnable, handler), delay, timeUnit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit timeUnit) { + return delegate.schedule(TaskConverter.alertingCallable(callable, handler), delay, timeUnit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate( + Runnable runnable, long initialDelay, long period, TimeUnit timeUnit) { + return delegate.scheduleAtFixedRate( + TaskConverter.alertingRunnable(runnable, handler), initialDelay, period, timeUnit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay( + Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) { + return delegate.scheduleWithFixedDelay( + TaskConverter.alertingRunnable(runnable, handler), initialDelay, delay, timeUnit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return delegate.submit(TaskConverter.alertingCallable(task, handler)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return delegate.submit(TaskConverter.alertingRunnable(task, handler), result); + } + + @Override + public Future<?> submit(Runnable task) { + return delegate.submit(TaskConverter.alertingRunnable(task, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return delegate.invokeAll(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(TaskConverter.alertingRunnable(command, handler)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java new file mode 100644 index 0000000..0e85bad --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java @@ -0,0 +1,74 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; + +import com.twitter.common.base.Command; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * An implementation of the graceful shutdown sequence recommended by {@link ExecutorService}. + * + * @author John Sirois + */ +public class ExecutorServiceShutdown implements Command { + private static final Logger LOG = Logger.getLogger(ExecutorServiceShutdown.class.getName()); + + private final ExecutorService executor; + private final Amount<Long, Time> gracePeriod; + + /** + * Creates a new {@code ExecutorServiceShutdown} command that will try to gracefully shut down the + * given {@code executor} when executed. If the supplied grace period is less than or equal to + * zero the executor service will be asked to shut down but no waiting will be done after these + * requests. + * + * @param executor The executor service this command should shut down when executed. + * @param gracePeriod The maximum time to wait after a shutdown request before continuing to the + * next shutdown phase. + */ + public ExecutorServiceShutdown(ExecutorService executor, Amount<Long, Time> gracePeriod) { + this.executor = Preconditions.checkNotNull(executor); + this.gracePeriod = Preconditions.checkNotNull(gracePeriod); + } + + @Override + public void execute() { + executor.shutdown(); // Disable new tasks from being submitted. + try { + // Wait a while for existing tasks to terminate. + if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); // Cancel currently executing tasks. + // Wait a while for tasks to respond to being cancelled. + if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + LOG.warning("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted. + executor.shutdownNow(); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java new file mode 100644 index 0000000..8b73ab0 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java @@ -0,0 +1,104 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.concurrent; + +import com.google.common.base.Preconditions; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An executor service that forwards all calls to another executor service. Subclasses should + * override one or more methods to modify the behavior of the backing executor service as desired + * per the <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>. + * + * @author John Sirois + */ +public class ForwardingExecutorService<T extends ExecutorService> implements ExecutorService { + protected final T delegate; + + public ForwardingExecutorService(T delegate) { + Preconditions.checkNotNull(delegate); + this.delegate = delegate; + } + + public void shutdown() { + delegate.shutdown(); + } + + public List<Runnable> shutdownNow() { + return delegate.shutdownNow(); + } + + public boolean isShutdown() { + return delegate.isShutdown(); + } + + public boolean isTerminated() { + return delegate.isTerminated(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + public <T> Future<T> submit(Callable<T> task) { + return delegate.submit(task); + } + + public <T> Future<T> submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + public Future<?> submit(Runnable task) { + return delegate.submit(task); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + + return delegate.invokeAll(tasks); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + + return delegate.invokeAll(tasks, timeout, unit); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + + return delegate.invokeAny(tasks); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + return delegate.invokeAny(tasks, timeout, unit); + } + + public void execute(Runnable command) { + delegate.execute(command); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java b/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java new file mode 100644 index 0000000..2591938 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java @@ -0,0 +1,109 @@ +package com.twitter.common.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +/** + * Utility class that provides factory functions to decorate + * {@link java.util.concurrent.ExecutorService}s. + */ +public final class MoreExecutors { + private MoreExecutors() { + // utility + } + + /** + * Returns a {@link ExecutorService} that passes uncaught exceptions to + * {@link java.lang.Thread.UncaughtExceptionHandler}. + * <p> + * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of + * unchecked exceptions thrown from submitted work. Some users are surprised to find that + * even the default uncaught exception handler is not invoked. + * + * @param executorService delegate + * @param uncaughtExceptionHandler exception handler that will receive exceptions generated + * from executor tasks. + * @return a decorated executor service + */ + public static ExecutorService exceptionHandlingExecutor( + ExecutorService executorService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + + Preconditions.checkNotNull(uncaughtExceptionHandler); + return new ExceptionHandlingExecutorService( + executorService, Suppliers.ofInstance(uncaughtExceptionHandler)); + } + + /** + * Returns a {@link ExecutorService} that passes uncaught exceptions to + * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler() + * at the time the exception is thrown. + * + * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ExecutorService, + * Thread.UncaughtExceptionHandler) + * @param executorService delegate + * @return a decorated executor service + */ + public static ExecutorService exceptionHandlingExecutor(ExecutorService executorService) { + return new ExceptionHandlingExecutorService( + executorService, + new Supplier<Thread.UncaughtExceptionHandler>() { + @Override + public Thread.UncaughtExceptionHandler get() { + return Thread.currentThread().getUncaughtExceptionHandler(); + } + }); + } + + /** + * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to + * {@link java.lang.Thread.UncaughtExceptionHandler}. + * <p> + * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of + * unchecked exceptions thrown from submitted work. Some users are surprised to find that + * even the default uncaught exception handler is not invoked. + * + * @param executorService delegate + * @param uncaughtExceptionHandler exception handler that will receive exceptions generated + * from executor tasks. + * @return a decorated executor service + */ + public static ScheduledExecutorService exceptionHandlingExecutor( + ScheduledExecutorService executorService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + + Preconditions.checkNotNull(uncaughtExceptionHandler); + return new ExceptionHandlingScheduledExecutorService( + executorService, + Suppliers.ofInstance(uncaughtExceptionHandler)); + } + + /** + * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to + * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler() + * at the time the exception is thrown. + * + * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ScheduledExecutorService, + * Thread.UncaughtExceptionHandler) + * @param executorService delegate + * @return a decorated executor service + */ + public static ScheduledExecutorService exceptionHandlingExecutor( + ScheduledExecutorService executorService) { + + return new ExceptionHandlingScheduledExecutorService( + executorService, + new Supplier<Thread.UncaughtExceptionHandler>() { + @Override + public Thread.UncaughtExceptionHandler get() { + return Thread.currentThread().getUncaughtExceptionHandler(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java new file mode 100644 index 0000000..808f3a9 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java @@ -0,0 +1,84 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.concurrent; + +import com.google.common.base.Preconditions; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A future task that supports retries by resubmitting itself to an {@link ExecutorService}. + * + * @author William Farner + */ +public class RetryingFutureTask extends FutureTask<Boolean> { + private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName()); + + protected final ExecutorService executor; + protected final int maxRetries; + protected int numRetries = 0; + protected final Callable<Boolean> callable; + + /** + * Creates a new retrying future task that will execute a unit of work until successfully + * completed, or the retry limit has been reached. + * + * @param executor The executor service to resubmit the task to upon failure. + * @param callable The unit of work. The work is considered successful when {@code true} is + * returned. It may return {@code false} or throw an exception when unsueccessful. + * @param maxRetries The maximum number of times to retry the task. + */ + public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) { + super(callable); + this.callable = Preconditions.checkNotNull(callable); + this.executor = Preconditions.checkNotNull(executor); + this.maxRetries = maxRetries; + } + + /** + * Invokes a retry of this task. + */ + protected void retry() { + executor.execute(this); + } + + @Override + public void run() { + boolean success = false; + try { + success = callable.call(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Exception while executing task.", e); + } + + if (!success) { + numRetries++; + if (numRetries > maxRetries) { + LOG.severe("Task did not complete after " + maxRetries + " retries, giving up."); + } else { + LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")"); + retry(); + } + } else { + set(true); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java b/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java new file mode 100644 index 0000000..9f1fcc2 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java @@ -0,0 +1,80 @@ +package com.twitter.common.util.concurrent; + +import java.util.Collection; +import java.util.concurrent.Callable; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; + +final class TaskConverter { + private TaskConverter() { + // utility + } + + /** + * Returns a wrapped {@link Runnable} that passes uncaught exceptions thrown from the + * original Runnable to {@link Thread.UncaughtExceptionHandler}. + * + * @param runnable runnable to be wrapped + * @param handler exception handler that will receive exceptions generated in the runnable + * @return wrapped runnable + */ + static Runnable alertingRunnable( + final Runnable runnable, + final Supplier<Thread.UncaughtExceptionHandler> handler) { + + return new Runnable() { + @Override + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + handler.get().uncaughtException(Thread.currentThread(), t); + throw Throwables.propagate(t); + } + } + }; + } + + /** + * Returns a wrapped {@link java.util.concurrent.Callable} that passes uncaught exceptions + * thrown from the original Callable to {@link Thread.UncaughtExceptionHandler}. + * + * @param callable callable to be wrapped + * @param handler exception handler that will receive exceptions generated in the callable + * @return wrapped callable + */ + static <V> Callable<V> alertingCallable( + final Callable<V> callable, + final Supplier<Thread.UncaughtExceptionHandler> handler) { + + return new Callable<V>() { + @Override + public V call() throws Exception { + try { + return callable.call(); + } catch (Throwable t) { + handler.get().uncaughtException(Thread.currentThread(), t); + throw Throwables.propagate(t); + } + } + }; + } + + /* + * Calls #alertingCallable on a collection of callables + */ + static <V> Collection<? extends Callable<V>> alertingCallables( + Collection<? extends Callable<V>> callables, + final Supplier<Thread.UncaughtExceptionHandler> handler) { + + return Collections2.transform(callables, new Function<Callable<V>, Callable<V>>() { + @Override + public Callable<V> apply(Callable<V> callable) { + return alertingCallable(callable, handler); + } + }); + } +}
