http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java b/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java new file mode 100644 index 0000000..e39b688 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java @@ -0,0 +1,174 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift.testing; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.thrift.TBase; +import org.apache.thrift.TBaseHelper; +import org.apache.thrift.TException; +import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.protocol.TField; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TStruct; +import org.apache.thrift.protocol.TType; + +import java.util.Map; +import java.util.Map.Entry; + +/** + * Hand-coded thrift types for use in tests. + * + * @author John Sirois + */ +public class TestThriftTypes { + public static class Field implements TFieldIdEnum { + private static final Map<Short, Field> FIELDS_BY_ID = Maps.newHashMap(); + public static Field forId(int id) { + Field field = FIELDS_BY_ID.get((short) id); + Preconditions.checkArgument(field != null, "No Field with id: %s", id); + return field; + } + + public static final Field NAME = new Field((short) 0, "name"); + public static final Field VALUE = new Field((short) 1, "value"); + + private final short fieldId; + private final String fieldName; + + private Field(short fieldId, String fieldName) { + this.fieldId = fieldId; + this.fieldName = fieldName; + FIELDS_BY_ID.put(fieldId, this); + } + + @Override + public short getThriftFieldId() { + return fieldId; + } + + @Override + public String getFieldName() { + return fieldName; + } + } + + public static class Struct implements TBase<Struct, Field> { + private final Map<Field, Object> fields = Maps.newHashMap(); + + public Struct() {} + + public Struct(String name, String value) { + fields.put(Field.NAME, name); + fields.put(Field.VALUE, value); + } + + public String getName() { + Object name = getFieldValue(Field.NAME); + return name == null ? null : (String) name; + } + + public String getValue() { + Object value = getFieldValue(Field.VALUE); + return value == null ? null : (String) value; + } + + @Override + public void read(TProtocol tProtocol) throws TException { + tProtocol.readStructBegin(); + TField field; + while((field = tProtocol.readFieldBegin()).type != TType.STOP) { + fields.put(fieldForId(field.id), tProtocol.readString()); + tProtocol.readFieldEnd(); + } + tProtocol.readStructEnd(); + } + + @Override + public void write(TProtocol tProtocol) throws TException { + tProtocol.writeStructBegin(new TStruct("Field")); + for (Entry<Field, Object> entry : fields.entrySet()) { + Field field = entry.getKey(); + tProtocol.writeFieldBegin( + new TField(field.getFieldName(), TType.STRING, field.getThriftFieldId())); + tProtocol.writeString(entry.getValue().toString()); + tProtocol.writeFieldEnd(); + } + tProtocol.writeFieldStop(); + tProtocol.writeStructEnd(); + } + + @Override + public boolean isSet(Field field) { + return fields.containsKey(field); + } + + @Override + public Object getFieldValue(Field field) { + return fields.get(field); + } + + @Override + public void setFieldValue(Field field, Object o) { + fields.put(field, o); + } + + @Override + public TBase<Struct, Field> deepCopy() { + Struct struct = new Struct(); + struct.fields.putAll(fields); + return struct; + } + + @Override + public int compareTo(Struct other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison; + + lastComparison = Integer.valueOf(fields.size()).compareTo(other.fields.size()); + if (lastComparison != 0) { + return lastComparison; + } + + for (Map.Entry<Field, Object> entry : fields.entrySet()) { + Field field = entry.getKey(); + lastComparison = Boolean.TRUE.compareTo(other.isSet(field)); + if (lastComparison != 0) { + return lastComparison; + } + lastComparison = TBaseHelper.compareTo(entry.getValue(), other.getFieldValue(field)); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @Override + public void clear() { + fields.clear(); + } + + @Override + public Field fieldForId(int fieldId) { + return Field.forId(fieldId); + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/BackoffDecider.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BackoffDecider.java b/commons/src/main/java/com/twitter/common/util/BackoffDecider.java new file mode 100644 index 0000000..defa660 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/BackoffDecider.java @@ -0,0 +1,666 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stats; +import com.twitter.common.stats.StatsProvider; + +import javax.annotation.Nullable; +import java.util.Deque; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** + * Handles logic for deciding whether to back off from calls to a backend. + * + * This works by offering a guard method {@link #shouldBackOff()}, which instructs the caller + * whether they should avoid making the call. The backoff logic will maintain statistics about + * the failure rate, and push into a backoff state (silent period) when the failure rate exceeds + * the configured threshold. At the end of the quiet period, a recovery state will be entered, + * during which the decider will allow traffic to ramp back up to full capacity. + * + * The expected use case looks something like this: + * + * <pre> + * void sendRequestGuarded() { + * if (!decider.shouldBackOff()) { + * boolean success = sendRequestUnguarded(); + * if (success) { + * decider.addSuccess(); + * } else { + * decider.addFailure(); + * } + * } + * } + * </pre> + * + * @author William Farner + */ +public class BackoffDecider { + private static final Logger LOG = Logger.getLogger(BackoffDecider.class.getName()); + + // The group that this decider is a part of. + private final Iterable<BackoffDecider> deciderGroup; + + private final TimedStateMachine stateMachine; + + private final String name; + + private final double toleratedFailureRate; + + @VisibleForTesting final RequestWindow requests; + + // Used to calculate backoff durations when in backoff state. + private final BackoffStrategy strategy; + + private final Amount<Long, Time> recoveryPeriod; + private long previousBackoffPeriodNs = 0; + + // Used for random selection during recovery period. + private final Random random; + + private final Clock clock; + private final AtomicLong backoffs; + private final RecoveryType recoveryType; + + /** + * Different types of recovery mechanisms to use after exiting the backoff state. + */ + public static enum RecoveryType { + // Randomly allows traffic to flow through, with a linearly-ascending probability. + RANDOM_LINEAR, + // Allows full traffic capacity to flow during the recovery period. + FULL_CAPACITY + } + + private BackoffDecider(String name, int seedSize, double toleratedFailureRate, + @Nullable Iterable<BackoffDecider> deciderGroup, BackoffStrategy strategy, + @Nullable Amount<Long, Time> recoveryPeriod, + long requestWindowNs, int numBuckets, RecoveryType recoveryType, StatsProvider statsProvider, + Random random, Clock clock) { + MorePreconditions.checkNotBlank(name); + Preconditions.checkArgument(seedSize > 0); + Preconditions.checkArgument(toleratedFailureRate >= 0 && toleratedFailureRate < 1.0); + Preconditions.checkNotNull(strategy); + Preconditions.checkArgument(recoveryPeriod == null || recoveryPeriod.getValue() > 0); + Preconditions.checkArgument(requestWindowNs > 0); + Preconditions.checkArgument(numBuckets > 0); + Preconditions.checkNotNull(recoveryType); + Preconditions.checkNotNull(statsProvider); + Preconditions.checkNotNull(random); + Preconditions.checkNotNull(clock); + + this.name = name; + this.toleratedFailureRate = toleratedFailureRate; + this.deciderGroup = deciderGroup; + this.strategy = strategy; + this.recoveryPeriod = recoveryPeriod; + this.recoveryType = recoveryType; + + this.random = random; + this.clock = clock; + + this.backoffs = statsProvider.makeCounter(name + "_backoffs"); + this.requests = new RequestWindow(requestWindowNs, numBuckets, seedSize); + + this.stateMachine = new TimedStateMachine(name); + } + + /** + * Checks whether the caller should back off and if not then returns immediately; otherwise the + * method blocks until it is safe for the caller to proceed without backing off further based on + * all data available at the time of this call. + * + * @return the amount of time in nanoseconds spent awaiting backoff + * @throws InterruptedException if the calling thread was interrupted while backing off + */ + public long awaitBackoff() throws InterruptedException { + if (shouldBackOff()) { + long backoffTimeMs = stateMachine.getStateRemainingMs(); + + if (backoffTimeMs > 0) { + // Wait without holding any external locks. + Object waitCondition = new Object(); + synchronized (waitCondition) { + waitCondition.wait(backoffTimeMs); + } + return backoffTimeMs; + } + } + return 0; + } + + /** + * Checks whether this decider instructs the caller that it should back off from the associated + * backend. This is determined based on the response history for the backend as well as the + * backoff state of the decider group (if configured). + * + * @return {@code true} if the decider is in backoff mode, otherwise {@code false}. + */ + @SuppressWarnings("fallthrough") + public synchronized boolean shouldBackOff() { + + boolean preventRequest; + switch (stateMachine.getState()) { + case NORMAL: + preventRequest = false; + break; + + case BACKOFF: + if (deciderGroup != null && allOthersBackingOff()) { + LOG.info("Backends in group with " + name + " down, forcing back up."); + stateMachine.transitionUnbounded(State.FORCED_NORMAL); + return false; + } else if (stateMachine.isStateExpired()) { + long recoveryPeriodNs = recoveryPeriod == null ? stateMachine.getStateDurationNs() + : recoveryPeriod.as(Time.NANOSECONDS); + + // The silent period has expired, move to recovery state (and drop to its case block). + stateMachine.transition(State.RECOVERY, recoveryPeriodNs); + LOG.info(String.format("%s recovering for %s ms", name, + Amount.of(recoveryPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); + } else { + preventRequest = true; + break; + } + + case RECOVERY: + if (deciderGroup != null && allOthersBackingOff()) { + return false; + } else if (stateMachine.isStateExpired()) { + // We have reached the end of the recovery period, return to normal. + stateMachine.transitionUnbounded(State.NORMAL); + previousBackoffPeriodNs = 0; + preventRequest = false; + } else { + switch (recoveryType) { + case RANDOM_LINEAR: + // In the recovery period, allow request rate to return linearly to the full load. + preventRequest = random.nextDouble() > stateMachine.getStateFractionComplete(); + break; + case FULL_CAPACITY: + preventRequest = false; + break; + default: + throw new IllegalStateException("Unhandled recovery type " + recoveryType); + } + } + + break; + + case FORCED_NORMAL: + if (!allOthersBackingOff()) { + // We were in forced normal state, but at least one other backend is up, try recovering. + stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs()); + preventRequest = false; + } else { + preventRequest = true; + } + + break; + + default: + LOG.severe("Unrecognized state: " + stateMachine.getState()); + preventRequest = false; + } + + if (preventRequest) { + backoffs.incrementAndGet(); + } + return preventRequest; + } + + private boolean allOthersBackingOff() { + // Search for another decider that is not backing off. + for (BackoffDecider decider : deciderGroup) { + State deciderState = decider.stateMachine.getState(); + boolean inBackoffState = deciderState == State.BACKOFF || deciderState == State.FORCED_NORMAL; + if ((decider != this) && !inBackoffState) { + return false; + } + } + + return true; + } + + /** + * Records a failed request to the backend. + */ + public void addFailure() { + addResult(false); + } + + /** + * Records a successful request to the backend. + */ + public void addSuccess() { + addResult(true); + } + + /** + * Transitions the state to BACKOFF and logs a message appropriately if it is doing so because of high fail rate + * or by force. + * + * @param failRate rate of request failures on this host. + * @param force if {@code true}, forces the transition to BACKOFF. Typically used in cases when the host + * was not found to be alive by LiveHostChecker. + */ + public synchronized void transitionToBackOff(double failRate, boolean force) { + long prevBackoffMs = Amount.of(previousBackoffPeriodNs, Time.NANOSECONDS) + .as(Time.MILLISECONDS); + + long backoffPeriodNs = Amount.of(strategy.calculateBackoffMs(prevBackoffMs), Time.MILLISECONDS) + .as(Time.NANOSECONDS); + if (!force) { + LOG.info(String.format("%s failure rate at %g, backing off for %s ms", name,failRate, + Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); + } else { + LOG.info(String.format("%s forced to back off for %s ms", name, + Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); + } + stateMachine.transition(State.BACKOFF, backoffPeriodNs); + previousBackoffPeriodNs = backoffPeriodNs; + } + + @SuppressWarnings("fallthrough") + private synchronized void addResult(boolean success) { + // Disallow statistics updating if we are in backoff state. + if (stateMachine.getState() == State.BACKOFF) { + return; + } + + requests.addResult(success); + double failRate = requests.getFailureRate(); + boolean highFailRate = requests.isSeeded() && (failRate > toleratedFailureRate); + + switch (stateMachine.getState()) { + case NORMAL: + if (!highFailRate) { + // No-op. + break; + } else { + // Artificially move into recovery state (by falling through) with a zero-duration + // time window, to trigger the initial backoff period. + stateMachine.setStateDurationNs(0); + } + + case RECOVERY: + if (highFailRate) { + // We were trying to recover, and the failure rate is still too high. Go back to + // backoff state for a longer duration. + requests.reset(); + + // transition the state machine to BACKOFF state, due to high fail rate. + transitionToBackOff(failRate, false); + } else { + // Do nothing. We only exit the recovery state by expiration. + } + break; + + case FORCED_NORMAL: + if (!highFailRate) { + stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs()); + } + break; + + case BACKOFF: + throw new IllegalStateException("Backoff state may only be exited by expiration."); + } + } + + /** + * Creates a builder object. + * + * @param name Name for the backoff decider to build. + * @return A builder. + */ + public static Builder builder(String name) { + return new Builder(name); + } + + /** + * Builder class to configure a BackoffDecider. + * + * The builder allows for customization of many different parameters to the BackoffDecider, while + * defining defaults wherever possible. The following defaults are used: + * + * <ul> + * <li> seed size - The number of requests to accumulate before a backoff will be considered. + * 100 + * + * <li> tolerated failure rate - Maximum failure rate before backing off. + * 0.5 + * + * <li> decider group - Group this decider is a part of, to prevent complete backend failure. + * null (disabled) + * + * <li> strategy - Used to calculate subsequent backoff durations. + * TruncatedBinaryBackoff, initial 100 ms, max 10s + * + * <li> recovery period - Fixed recovery period while ramping traffic back to full capacity.. + * null (use last backoff period) + * + * <li> request window - Duration of the sliding window of requests to track statistics for. + * 10 seconds + * + * <li> num buckets - The number of time slices within the request window, for stat expiration. + * The sliding request window advances in intervals of request window / num buckets. + * 100 + * + * <li> recovery type - Defines behavior during the recovery period, and how traffic is permitted. + * random linear + * + * <li> stat provider - The stats provider to export statistics to. + * Stats.STATS_PROVIDER + * </ul> + * + */ + public static class Builder { + private String name; + private int seedSize = 100; + private double toleratedFailureRate = 0.5; + private Set<BackoffDecider> deciderGroup = null; + private BackoffStrategy strategy = new TruncatedBinaryBackoff( + Amount.of(100L, Time.MILLISECONDS), Amount.of(10L, Time.SECONDS)); + private Amount<Long, Time> recoveryPeriod = null; + private long requestWindowNs = Amount.of(10L, Time.SECONDS).as(Time.NANOSECONDS); + private int numBuckets = 100; + private RecoveryType recoveryType = RecoveryType.RANDOM_LINEAR; + private StatsProvider statsProvider = Stats.STATS_PROVIDER; + private Random random = Random.Util.newDefaultRandom(); + private Clock clock = Clock.SYSTEM_CLOCK; + + Builder(String name) { + this.name = name; + } + + /** + * Sets the number of requests that must be accumulated before the error rate will be + * calculated. This improves the genesis problem where the first few requests are errors, + * causing flapping in and out of backoff state. + * + * @param seedSize Request seed size. + * @return A reference to the builder. + */ + public Builder withSeedSize(int seedSize) { + this.seedSize = seedSize; + return this; + } + + /** + * Sets the tolerated failure rate for the decider. If the rate is exceeded for the time + * window, the decider begins backing off. + * + * @param toleratedRate The tolerated failure rate (between 0.0 and 1.0, exclusive). + * @return A reference to the builder. + */ + public Builder withTolerateFailureRate(double toleratedRate) { + this.toleratedFailureRate = toleratedRate; + return this; + } + + /** + * Makes the decider a part of a group. When a decider is a part of a group, it will monitor + * the other deciders to ensure that all deciders do not back off at once. + * + * @param deciderGroup Group to make this decider a part of. More deciders may be added to the + * group after this call is made. + * @return A reference to the builder. + */ + public Builder groupWith(Set<BackoffDecider> deciderGroup) { + this.deciderGroup = deciderGroup; + return this; + } + + /** + * Overrides the default backoff strategy. + * + * @param strategy Backoff strategy to use. + * @return A reference to the builder. + */ + public Builder withStrategy(BackoffStrategy strategy) { + this.strategy = strategy; + return this; + } + + /** + * Overrides the default recovery period behavior. By default, the recovery period is equal + * to the previous backoff period (which is equivalent to setting the recovery period to null + * here). A non-null value here will assign a fixed recovery period. + * + * @param recoveryPeriod Fixed recovery period. + * @return A reference to the builder. + */ + public Builder withRecoveryPeriod(@Nullable Amount<Long, Time> recoveryPeriod) { + this.recoveryPeriod = recoveryPeriod; + return this; + } + + /** + * Sets the time window over which to analyze failures. Beyond the time window, request history + * is discarded (and ignored). + * + * @param requestWindow The analysis time window. + * @return A reference to the builder. + */ + public Builder withRequestWindow(Amount<Long, Time> requestWindow) { + this.requestWindowNs = requestWindow.as(Time.NANOSECONDS); + return this; + } + + /** + * Sets the number of time slices that the decider will use to partition aggregate statistics. + * + * @param numBuckets Bucket count. + * @return A reference to the builder. + */ + public Builder withBucketCount(int numBuckets) { + this.numBuckets = numBuckets; + return this; + } + + /** + * Sets the recovery mechanism to use when in the recovery period. + * + * @param recoveryType The recovery mechanism to use. + * @return A reference to the builder. + */ + public Builder withRecoveryType(RecoveryType recoveryType) { + this.recoveryType = recoveryType; + return this; + } + + /** + * Sets the stats provider that statistics should be exported to. + * + * @param statsProvider Stats provider to use. + * @return A reference to the builder. + */ + public Builder withStatsProvider(StatsProvider statsProvider) { + this.statsProvider = statsProvider; + return this; + } + + @VisibleForTesting public Builder withRandom(Random random) { + this.random = random; + return this; + } + + @VisibleForTesting public Builder withClock(Clock clock) { + this.clock = clock; + return this; + } + + /** + * Gets a reference to the built decider object. + * @return A decider object. + */ + public BackoffDecider build() { + BackoffDecider decider = new BackoffDecider(name, seedSize, toleratedFailureRate, + deciderGroup, strategy, recoveryPeriod, requestWindowNs, numBuckets, recoveryType, + statsProvider, random, clock); + if (deciderGroup != null) deciderGroup.add(decider); + return decider; + } + } + + private class TimeSlice { + int requestCount = 0; + int failureCount = 0; + final long bucketStartNs; + + public TimeSlice() { + bucketStartNs = clock.nowNanos(); + } + } + + class RequestWindow { + // These store the sum of the respective fields contained within buckets. Doing so removes the + // need to accumulate the counts within the buckets every time the backoff state is + // recalculated. + @VisibleForTesting long totalRequests = 0; + @VisibleForTesting long totalFailures = 0; + + private final long durationNs; + private final long bucketLengthNs; + private final int seedSize; + + // Stores aggregate request/failure counts for time slices. + private final Deque<TimeSlice> buckets = Lists.newLinkedList(); + + RequestWindow(long durationNs, int bucketCount, int seedSize) { + this.durationNs = durationNs; + this.bucketLengthNs = durationNs / bucketCount; + buckets.addFirst(new TimeSlice()); + this.seedSize = seedSize; + } + + void reset() { + totalRequests = 0; + totalFailures = 0; + buckets.clear(); + buckets.addFirst(new TimeSlice()); + } + + void addResult(boolean success) { + maybeShuffleBuckets(); + buckets.peekFirst().requestCount++; + totalRequests++; + + if (!success) { + buckets.peekFirst().failureCount++; + totalFailures++; + } + } + + void maybeShuffleBuckets() { + // Check if the first bucket is still relevant. + if (clock.nowNanos() - buckets.peekFirst().bucketStartNs >= bucketLengthNs) { + + // Remove old buckets. + while (!buckets.isEmpty() + && buckets.peekLast().bucketStartNs < clock.nowNanos() - durationNs) { + TimeSlice removed = buckets.removeLast(); + totalRequests -= removed.requestCount; + totalFailures -= removed.failureCount; + } + + buckets.addFirst(new TimeSlice()); + } + } + + boolean isSeeded() { + return totalRequests >= seedSize; + } + + double getFailureRate() { + return totalRequests == 0 ? 0 : ((double) totalFailures) / totalRequests; + } + } + + private static enum State { + NORMAL, // All requests are being permitted. + BACKOFF, // Quiet period while waiting for backend to recover/improve. + RECOVERY, // Ramping period where an ascending fraction of requests is being permitted. + FORCED_NORMAL // All other backends in the group are backing off, so this one is forced normal. + } + private class TimedStateMachine { + final StateMachine<State> stateMachine; + + private long stateEndNs; + private long stateDurationNs; + + TimedStateMachine(String name) { + stateMachine = StateMachine.<State>builder(name + "_backoff_state_machine") + .addState(State.NORMAL, State.BACKOFF, State.FORCED_NORMAL) + .addState(State.BACKOFF, State.RECOVERY, State.FORCED_NORMAL) + .addState(State.RECOVERY, State.NORMAL, State.BACKOFF, State.FORCED_NORMAL) + .addState(State.FORCED_NORMAL, State.RECOVERY) + .initialState(State.NORMAL) + .build(); + } + + State getState() { + return stateMachine.getState(); + } + + void transitionUnbounded(State state) { + stateMachine.transition(state); + } + + void transition(State state, long durationNs) { + transitionUnbounded(state); + this.stateEndNs = clock.nowNanos() + durationNs; + this.stateDurationNs = durationNs; + } + + long getStateDurationNs() { + return stateDurationNs; + } + + long getStateDurationMs() { + return Amount.of(stateDurationNs, Time.NANOSECONDS).as(Time.MILLISECONDS); + } + + void setStateDurationNs(long stateDurationNs) { + this.stateDurationNs = stateDurationNs; + } + + long getStateRemainingNs() { + return stateEndNs - clock.nowNanos(); + } + + long getStateRemainingMs() { + return Amount.of(getStateRemainingNs(), Time.NANOSECONDS).as(Time.MILLISECONDS); + } + + double getStateFractionComplete() { + return 1.0 - ((double) getStateRemainingNs()) / stateDurationNs; + } + + boolean isStateExpired() { + return clock.nowNanos() > stateEndNs; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/BackoffHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BackoffHelper.java b/commons/src/main/java/com/twitter/common/util/BackoffHelper.java new file mode 100644 index 0000000..614ea21 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/BackoffHelper.java @@ -0,0 +1,155 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.twitter.common.base.ExceptionalSupplier; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import java.util.logging.Logger; + +/** + * A utility for dealing with backoffs of retryable actions. + * + * <p>TODO(John Sirois): investigate synergies with BackoffDecider. + * + * @author John Sirois + */ +public class BackoffHelper { + private static final Logger LOG = Logger.getLogger(BackoffHelper.class.getName()); + + private static final Amount<Long,Time> DEFAULT_INITIAL_BACKOFF = Amount.of(1L, Time.SECONDS); + private static final Amount<Long,Time> DEFAULT_MAX_BACKOFF = Amount.of(1L, Time.MINUTES); + + private final Clock clock; + private final BackoffStrategy backoffStrategy; + + /** + * Creates a new BackoffHelper that uses truncated binary backoff starting at a 1 second backoff + * and maxing out at a 1 minute backoff. + */ + public BackoffHelper() { + this(DEFAULT_INITIAL_BACKOFF, DEFAULT_MAX_BACKOFF); + } + + /** + * Creates a new BackoffHelper that uses truncated binary backoff starting at the given + * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. + * + * @param initialBackoff the initial amount of time to back off + * @param maxBackoff the maximum amount of time to back off + */ + public BackoffHelper(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff) { + this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff)); + } + + /** + * Creates a new BackoffHelper that uses truncated binary backoff starting at the given + * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. This will either: + * <ul> + * <li>{@code stopAtMax == true} : throw {@code BackoffExpiredException} when maxBackoff is + * reached</li> + * <li>{@code stopAtMax == false} : continue backing off with maxBackoff</li> + * </ul> + * + * @param initialBackoff the initial amount of time to back off + * @param maxBackoff the maximum amount of time to back off + * @param stopAtMax if true, this will throw {@code BackoffStoppedException} when the max backoff is + * reached + */ + public BackoffHelper(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff, + boolean stopAtMax) { + this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff, stopAtMax)); + } + + /** + * Creates a BackoffHelper that uses the given {@code backoffStrategy} to calculate backoffs + * between retries. + * + * @param backoffStrategy the backoff strategy to use + */ + public BackoffHelper(BackoffStrategy backoffStrategy) { + this(Clock.SYSTEM_CLOCK, backoffStrategy); + } + + @VisibleForTesting BackoffHelper(Clock clock, BackoffStrategy backoffStrategy) { + this.clock = Preconditions.checkNotNull(clock); + this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy); + } + + /** + * Executes the given task using the configured backoff strategy until the task succeeds as + * indicated by returning {@code true}. + * + * @param task the retryable task to execute until success + * @throws InterruptedException if interrupted while waiting for the task to execute successfully + * @throws BackoffStoppedException if the backoff stopped unsuccessfully + * @throws E if the task throws + */ + public <E extends Exception> void doUntilSuccess(final ExceptionalSupplier<Boolean, E> task) + throws InterruptedException, BackoffStoppedException, E { + doUntilResult(new ExceptionalSupplier<Boolean, E>() { + @Override public Boolean get() throws E { + Boolean result = task.get(); + return Boolean.TRUE.equals(result) ? result : null; + } + }); + } + + /** + * Executes the given task using the configured backoff strategy until the task succeeds as + * indicated by returning a non-null value. + * + * @param task the retryable task to execute until success + * @return the result of the successfully executed task + * @throws InterruptedException if interrupted while waiting for the task to execute successfully + * @throws BackoffStoppedException if the backoff stopped unsuccessfully + * @throws E if the task throws + */ + public <T, E extends Exception> T doUntilResult(ExceptionalSupplier<T, E> task) + throws InterruptedException, BackoffStoppedException, E { + T result = task.get(); // give an immediate try + return (result != null) ? result : retryWork(task); + } + + private <T, E extends Exception> T retryWork(ExceptionalSupplier<T, E> work) + throws E, InterruptedException, BackoffStoppedException { + long currentBackoffMs = 0; + while (backoffStrategy.shouldContinue(currentBackoffMs)) { + currentBackoffMs = backoffStrategy.calculateBackoffMs(currentBackoffMs); + LOG.fine("Operation failed, backing off for " + currentBackoffMs + "ms"); + clock.waitFor(currentBackoffMs); + + T result = work.get(); + if (result != null) { + return result; + } + } + throw new BackoffStoppedException(String.format("Backoff stopped without succeeding.")); + } + + /** + * Occurs after the backoff strategy should stop. + */ + public static class BackoffStoppedException extends RuntimeException { + public BackoffStoppedException(String msg) { + super(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java b/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java new file mode 100644 index 0000000..42e0d28 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java @@ -0,0 +1,40 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +/** + * Encapsulates a strategy for backing off from an operation that repeatedly fails. + */ +public interface BackoffStrategy { + + /** + * Calculates the amount of time to backoff from an operation. + * + * @param lastBackoffMs the last used backoff in milliseconds where 0 signifies no backoff has + * been performed yet + * @return the amount of time in milliseconds to back off before retrying the operation + */ + long calculateBackoffMs(long lastBackoffMs); + + /** + * Returns whether to continue backing off. + * + * @param lastBackoffMs the last used backoff in milliseconds + * @return whether to continue backing off + */ + boolean shouldContinue(long lastBackoffMs); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/BuildInfo.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BuildInfo.java b/commons/src/main/java/com/twitter/common/util/BuildInfo.java new file mode 100644 index 0000000..9f08aa8 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/BuildInfo.java @@ -0,0 +1,111 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.io.InputStream; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.annotations.VisibleForTesting; + +import com.twitter.common.base.MorePreconditions; + +/** + * Handles loading of a build properties file, and provides keys to look up known values in the + * properties. + */ +public class BuildInfo { + + private static final Logger LOG = Logger.getLogger(BuildInfo.class.getName()); + + private static final String DEFAULT_BUILD_PROPERTIES_PATH = "build.properties"; + + private final String resourcePath; + + private Properties properties = null; + + /** + * Creates a build info container that will use the default properties file path. + */ + public BuildInfo() { + this(DEFAULT_BUILD_PROPERTIES_PATH); + } + + /** + * Creates a build info container, reading from the given path. + * + * @param resourcePath The resource path to read build properties from. + */ + public BuildInfo(String resourcePath) { + this.resourcePath = MorePreconditions.checkNotBlank(resourcePath); + } + + @VisibleForTesting + public BuildInfo(Properties properties) { + this.resourcePath = null; + this.properties = properties; + } + + private void fetchProperties() { + properties = new Properties(); + LOG.info("Fetching build properties from " + resourcePath); + InputStream in = ClassLoader.getSystemResourceAsStream(resourcePath); + if (in == null) { + LOG.warning("Failed to fetch build properties from " + resourcePath); + return; + } + + try { + properties.load(in); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to load properties file " + resourcePath, e); + } + } + + /** + * Fetches the properties stored in the resource location. + * + * @return The loaded properties, or a default properties object if there was a problem loading + * the specified properties resource. + */ + public Properties getProperties() { + if (properties == null) fetchProperties(); + return properties; + } + + /** + * Values of keys that are expected to exist in the loaded properties file. + */ + public enum Key { + PATH("build.path"), + USER("build.user.name"), + MACHINE("build.machine"), + DATE("build.date"), + TIME("build.time"), + TIMESTAMP("build.timestamp"), + GIT_TAG("build.git.tag"), + GIT_REVISION("build.git.revision"), + GIT_REVISION_NUMBER("build.git.revision.number"), + GIT_BRANCHNAME("build.git.branchname"); + + public final String value; + private Key(String value) { + this.value = value; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Clock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/Clock.java b/commons/src/main/java/com/twitter/common/util/Clock.java new file mode 100644 index 0000000..ff64716 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/Clock.java @@ -0,0 +1,73 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.io.Serializable; + +/** + * An abstraction of the system clock. + * + * @author John Sirois + */ +public interface Clock { + + /** + * A clock that returns the the actual time reported by the system. + * This clock is guaranteed to be serializable. + */ + Clock SYSTEM_CLOCK = new SerializableClock() { + @Override public long nowMillis() { + return System.currentTimeMillis(); + } + @Override public long nowNanos() { + return System.nanoTime(); + } + @Override public void waitFor(long millis) throws InterruptedException { + Thread.sleep(millis); + } + }; + + /** + * Returns the current time in milliseconds since the epoch. + * + * @return The current time in milliseconds since the epoch. + * @see System#currentTimeMillis() + */ + long nowMillis(); + + /** + * Returns the current time in nanoseconds. Should be used only for relative timing. + * See {@code System.nanoTime()} for tips on using the value returned here. + * + * @return A measure of the current time in nanoseconds. + * @see System#nanoTime() + */ + long nowNanos(); + + /** + * Waits for the given amount of time to pass on this clock before returning. + * + * @param millis the amount of time to wait in milliseconds + * @throws InterruptedException if this wait was interrupted + */ + void waitFor(long millis) throws InterruptedException; +} + +/** + * A typedef to support anonymous {@link Clock} implementations that are also {@link Serializable}. + */ +interface SerializableClock extends Clock, Serializable { } http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/CommandExecutor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/CommandExecutor.java b/commons/src/main/java/com/twitter/common/util/CommandExecutor.java new file mode 100644 index 0000000..0e197dd --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/CommandExecutor.java @@ -0,0 +1,45 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * Asynchronous executor of enqueued tasks in a rate limited manner. + * + * @author Srinivasan Rajagopal + */ +public interface CommandExecutor { + + /** + * Enqueue a task to be executed with retry semantics defined. + * + * @param name Human readable name for this task. + * @param task task to execute. + * @param exceptionClass Concrete exception type. + * @param maxTries num of tries in case of failure. + * @param retryDelay interval between retries in case of failure. + */ + <E extends Exception> void execute( + String name, + ExceptionalCommand<E> task, + Class<E> exceptionClass, + int maxTries, + Amount<Long, Time> retryDelay); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/DateUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/DateUtils.java b/commons/src/main/java/com/twitter/common/util/DateUtils.java new file mode 100644 index 0000000..52a3122 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/DateUtils.java @@ -0,0 +1,60 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * Utilities for working with java {@link Date}s. + * + * @author John Sirois + */ +public final class DateUtils { + + public static Date now() { + return new Date(); + } + + public static long toUnixTime(Date date) { + return toUnixTime(date.getTime()); + } + + public static long nowUnixTime() { + return toUnixTime(System.currentTimeMillis()); + } + + public static long toUnixTime(long millisSinceEpoch) { + return TimeUnit.MILLISECONDS.toSeconds(millisSinceEpoch); + } + + public static Date ago(int calendarField, int amount) { + return ago(now(), calendarField, amount); + } + + public static Date ago(Date referenceDate, int calendarField, int amount) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(referenceDate); + calendar.add(calendarField, -1 * amount); + return calendar.getTime(); + } + + private DateUtils() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/FileUtils.java b/commons/src/main/java/com/twitter/common/util/FileUtils.java new file mode 100644 index 0000000..0c97f66 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/FileUtils.java @@ -0,0 +1,54 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.io.File; + +/** + * Utilities for working with Files + * + * @author Florian Leibert + */ +public final class FileUtils { + + private FileUtils() { + } + + /** + * recursively deletes the path and all it's content and returns true if it succeeds + * Note that the content could be partially deleted and the method return false + * + * @param path the path to delete + * @return true if the path was deleted + */ + public static boolean forceDeletePath(File path) { + if (path == null) { + return false; + } + if (path.exists() && path.isDirectory()) { + File[] files = path.listFiles(); + for (File file : files) { + if (file.isDirectory()) { + forceDeletePath(file); + } else { + file.delete(); + } + } + } + return path.delete(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/LowResClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/LowResClock.java b/commons/src/main/java/com/twitter/common/util/LowResClock.java new file mode 100644 index 0000000..36f0775 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/LowResClock.java @@ -0,0 +1,111 @@ +// ================================================================================================= +// Copyright 2014 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.io.Closeable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * Low resolution implementation of a {@link com.twitter.common.util.Clock}, + * optimized for fast reads at the expense of precision. + * It works by caching the result of the system clock for a + * {@code resolution} amount of time. + */ +public class LowResClock implements Clock, Closeable { + private static final ScheduledExecutorService GLOBAL_SCHEDULER = + Executors.newScheduledThreadPool(1, new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "LowResClock"); + t.setDaemon(true); + return t; + } + }); + + private volatile long time; + private final ScheduledFuture<?> updaterHandler; + private final Clock underlying; + + @VisibleForTesting + LowResClock(Amount<Long, Time> resolution, ScheduledExecutorService executor, Clock clock) { + long sleepTimeMs = resolution.as(Time.MILLISECONDS); + Preconditions.checkArgument(sleepTimeMs > 0); + underlying = clock; + Runnable ticker = new Runnable() { + @Override public void run() { + time = underlying.nowMillis(); + } + }; + + // Ensure the constructing thread sees a LowResClock with a valid (low-res) time by executing a + // blocking call now. + ticker.run(); + + updaterHandler = + executor.scheduleAtFixedRate(ticker, sleepTimeMs, sleepTimeMs, TimeUnit.MILLISECONDS); + } + + + /** + * Construct a LowResClock which wraps the system clock. + * This constructor will also schedule a periodic task responsible for + * updating the time every {@code resolution}. + */ + public LowResClock(Amount<Long, Time> resolution) { + this(resolution, GLOBAL_SCHEDULER, Clock.SYSTEM_CLOCK); + } + + /** + * Terminate the underlying updater task. + * Any subsequent usage of the clock will throw an {@link IllegalStateException}. + */ + public void close() { + updaterHandler.cancel(true); + } + + @Override + public long nowMillis() { + checkNotClosed(); + return time; + } + + @Override + public long nowNanos() { + return nowMillis() * 1000 * 1000; + } + + @Override + public void waitFor(long millis) throws InterruptedException { + checkNotClosed(); + underlying.waitFor(millis); + } + + private void checkNotClosed() { + if (updaterHandler.isCancelled()) { + throw new IllegalStateException("LowResClock invoked after being closed!"); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/ParsingUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/ParsingUtil.java b/commons/src/main/java/com/twitter/common/util/ParsingUtil.java new file mode 100644 index 0000000..69d5624 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/ParsingUtil.java @@ -0,0 +1,56 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.base.Preconditions; + +import com.twitter.common.collections.Pair; + +/** + * Common methods for parsing configs. + * + * @author John Sirois + */ +public class ParsingUtil { + /** + * Parses a string as a range between one integer and another. The integers must be separated by + * a hypen character (space padding is acceptable). Additionally, the first integer + * (left-hand side) must be less than or equal to the second (right-hand side). + * + * @param rangeString The string to parse as an integer range. + * @return A pair of the parsed integers. + */ + public static Pair<Integer, Integer> parseRange(String rangeString) { + if (rangeString == null) return null; + + String[] startEnd = rangeString.split("-"); + Preconditions.checkState( + startEnd.length == 2, "Shard range format: start-end (e.g. 1-4)"); + int start; + int end; + try { + start = Integer.parseInt(startEnd[0].trim()); + end = Integer.parseInt(startEnd[1].trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse shard range.", e); + } + + Preconditions.checkState( + start <= end, "The left-hand side of a shard range must be <= the right-hand side."); + return Pair.of(start, end); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/QueueDrainer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/QueueDrainer.java b/commons/src/main/java/com/twitter/common/util/QueueDrainer.java new file mode 100644 index 0000000..32f010e --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/QueueDrainer.java @@ -0,0 +1,56 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +import com.google.common.base.Preconditions; + +/** + * Joins a task queue with an executor service, to add control over when + * tasks are actually made available for execution. + * + * @author Srinivasan Rajagopal + */ +public class QueueDrainer<T extends Runnable> implements Runnable { + + private final Executor taskExecutor; + private final BlockingQueue<T> blockingQueue; + + /** + * Creates a QueueDrainer that associates the queue with an executorService. + * + * @param taskExecutor Executor to execute a task if present. + * @param blockingQueue Queue to poll if there is a runnable to execute. + */ + public QueueDrainer(Executor taskExecutor, BlockingQueue<T> blockingQueue) { + this.taskExecutor = Preconditions.checkNotNull(taskExecutor); + this.blockingQueue = Preconditions.checkNotNull(blockingQueue); + } + + /** + * Picks tasks from the Queue to execute if present else no-op. + */ + @Override + public void run() { + Runnable command = blockingQueue.poll(); + if (command != null) { + taskExecutor.execute(command); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Random.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/Random.java b/commons/src/main/java/com/twitter/common/util/Random.java new file mode 100644 index 0000000..a08d712 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/Random.java @@ -0,0 +1,81 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.base.Preconditions; + +/** + * An interface to define the common functionality that is required for generating random values. + * + * @author William Farner + */ +public interface Random { + + /** + * @see java.util.Random#nextDouble() + */ + public double nextDouble(); + + /** + * @see java.util.Random#nextInt(int) + */ + public int nextInt(int n); + + /** + * A Random that wraps a java.util.Random. + */ + static class SystemRandom implements Random { + private final java.util.Random rand; + + public SystemRandom(java.util.Random rand) { + this.rand = Preconditions.checkNotNull(rand); + } + + @Override + public double nextDouble() { + return rand.nextDouble(); + } + + @Override + public int nextInt(int n) { + return rand.nextInt(n); + } + } + + // Utility class. + public static class Util { + private Util() {} + + /** + * Creates a new Random based off the default system Random. + * @return A new default Random. + */ + public static Random newDefaultRandom() { + return new SystemRandom(new java.util.Random()); + } + + /** + * Adapts a java.util.Random into a Random. + * + * @param rand The java.util.Random to adapt. + * @return A new Random. + */ + public static Random fromSystemRandom(java.util.Random rand) { + return new SystemRandom(rand); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/RangeNormalizer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/RangeNormalizer.java b/commons/src/main/java/com/twitter/common/util/RangeNormalizer.java new file mode 100644 index 0000000..143bc1a --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/RangeNormalizer.java @@ -0,0 +1,91 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +//************************************************************************ +// +// Summize +// +// This work protected by US Copyright Law and contains proprietary and +// confidential trade secrets. +// +// (c) Copyright 2006 Summize, ALL RIGHTS RESERVED. +// +//************************************************************************ +package com.twitter.common.util; + +/** + * Generic range normalizer class. Values must be positive. + * + * @author Abdur Chowdhury + */ +public class RangeNormalizer { + public RangeNormalizer(double minA, double maxA, double minB, double maxB) { + _minA = minA; + _maxA = maxA; + _minB = minB; + _maxB = maxB; + _denominator = (_maxA - _minA); + _B = (_maxB - _minB); + _midB = minB + (_B / 2f); + } + + public double normalize(double value) { + // if no input range, return a mid range value + if (_denominator == 0) { + return _midB; + } + + return ((value - _minA) / _denominator) * _B + _minB; + } + + public static double normalize(double value, double minA, double maxA, double minB, double maxB) { + // if the source min and max are equal, don't return 0, return something + // in the target range (perhaps this "default" should be another argument) + if (minA == maxA) { + return minB; + } + + return ((value - minA) / (maxA - minA)) * (maxB - minB) + minB; + } + + public static float normalizeToStepDistribution(double rating) { + int integerRating = (int) Math.round(rating); + + if (integerRating == 2) { + integerRating = 1; + } else if (integerRating == 4) { + integerRating = 3; + } else if (integerRating == 6) { + integerRating = 5; + } else if (integerRating == 8) { + integerRating = 7; + } else if (integerRating == 9) { + integerRating = 10; + } + + return (float) integerRating; + } + + // ******************************************************************* + private double _denominator; + private double _B; + private double _minA = Double.MIN_VALUE; + private double _maxA = Double.MAX_VALUE; + private double _minB = Double.MIN_VALUE; + private double _maxB = Double.MAX_VALUE; + private double _midB = Double.MAX_VALUE; +} + http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/RateLimitedCommandExecutor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/RateLimitedCommandExecutor.java b/commons/src/main/java/com/twitter/common/util/RateLimitedCommandExecutor.java new file mode 100644 index 0000000..508fcf5 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/RateLimitedCommandExecutor.java @@ -0,0 +1,93 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * CommandExecutor that invokes {@code queueDrainer} with a best-effort + * mechanism to execute with a fixed interval between requests of {@code + * intervalBetweenRequests}. + * + * @author Srinivasan Rajagopal + */ +public class RateLimitedCommandExecutor implements CommandExecutor { + + private static final Logger LOG = Logger.getLogger(RateLimitedCommandExecutor.class.getName()); + + private final BlockingQueue<RetryingRunnable<?>> blockingQueue; + + /** + * Create a CommandExecutor that executes enquequed tasks in the task + * executor with specified interval between executions. + * + * @param taskExecutor executor for periodic execution of enqueued tasks. + * @param intervalBetweenRequests interval between requests to rate limit + * request rate. + * @param queueDrainer A runnable that is responsible for draining the queue. + * @param blockingQueue Queue to keep outstanding work in. + */ + public RateLimitedCommandExecutor( + ScheduledExecutorService taskExecutor, + Amount<Long, Time> intervalBetweenRequests, + Runnable queueDrainer, + BlockingQueue<RetryingRunnable<?>> blockingQueue) { + + checkNotNull(taskExecutor); + checkNotNull(intervalBetweenRequests); + checkArgument(intervalBetweenRequests.as(Time.MILLISECONDS) > 0); + checkNotNull(queueDrainer); + this.blockingQueue = checkNotNull(blockingQueue); + taskExecutor.scheduleWithFixedDelay( + getSafeRunner(queueDrainer), + 0, + intervalBetweenRequests.as(Time.MILLISECONDS), + TimeUnit.MILLISECONDS); + } + + private static Runnable getSafeRunner(final Runnable runnable) { + return new Runnable() { + @Override public void run() { + try { + runnable.run(); + } catch (RuntimeException t) { + LOG.log(Level.INFO, " error processing task " + runnable); + } + } + }; + } + + @Override + public <E extends Exception> void execute(String name, ExceptionalCommand<E> task, + Class<E> exceptionClass, int numTries, Amount<Long, Time> retryDelay) { + blockingQueue.add(new RetryingRunnable<E>(name, task, exceptionClass, + numTries, retryDelay, this)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/RetryingRunnable.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/RetryingRunnable.java b/commons/src/main/java/com/twitter/common/util/RetryingRunnable.java new file mode 100644 index 0000000..a9e7aba --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/RetryingRunnable.java @@ -0,0 +1,134 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Throwables; + +import org.apache.commons.lang.builder.ToStringBuilder; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A runnable task that is retried in a user-configurable fashion. + * + * @param <E> The type of exception that the ExceptionalCommand throws. + * + * @author Utkarsh Srivastava + */ +public class RetryingRunnable<E extends Exception> implements Runnable { + private final String name; + private final int tryNum; + private final int numTries; + private final Amount<Long, Time> retryDelay; + private final ExceptionalCommand<E> task; + private final CommandExecutor commandExecutor; + private final Class<E> exceptionClass; + + private static final Logger LOG = Logger.getLogger(RetryingRunnable.class.getName()); + + /** + * Create a Task with name {@code name} that executes at most {@code numTries} + * in case of failure with an interval of {@code retryDelay} between attempts. + * + * @param name Human readable name for this task. + * @param task the task to execute. + * @param exceptionClass class of the exception thrown by the task. + * @param numTries the total number of times to try. + * @param retryDelay the delay between successive tries. + * @param commandExecutor Executor to resubmit retries to. + * @param tryNum the seq number of this try. + */ + public RetryingRunnable( + String name, + ExceptionalCommand<E> task, + Class<E> exceptionClass, + int numTries, + Amount<Long, Time> retryDelay, + CommandExecutor commandExecutor, + int tryNum) { + + this.name = checkNotNull(name); + this.task = checkNotNull(task); + this.exceptionClass = checkNotNull(exceptionClass); + this.retryDelay = checkNotNull(retryDelay); + this.commandExecutor = checkNotNull(commandExecutor); + checkArgument(numTries > 0); + this.tryNum = tryNum; + this.numTries = numTries; + } + + /** + * Create a Task with name {@code name} that executes at most {@code numTries} + * in case of failure with an interval of {@code retryDelay} between attempts + * and sets tryNum to be the first (=1). + * + * @param name Human readable name for this task. + * @param task the task to execute. + * @param exceptionClass class of the exception thrown by the task. + * @param numTries the total number of times to try. + * @param retryDelay the delay between successive tries. + * @param commandExecutor Executor to resubmit retries to. + */ + public RetryingRunnable( + String name, + ExceptionalCommand<E> task, + Class<E> exceptionClass, + int numTries, + Amount<Long, Time> retryDelay, + CommandExecutor commandExecutor) { + + this(name, task, exceptionClass, numTries, retryDelay, commandExecutor, /*tryNum=*/ 1); + } + + @Override + public void run() { + try { + task.execute(); + } catch (Exception e) { + if (e.getClass().isAssignableFrom(exceptionClass)) { + if (tryNum < numTries) { + commandExecutor.execute(name, task, exceptionClass, numTries - 1, retryDelay); + } else { + LOG.log(Level.INFO, "Giving up on task: " + name + " " + + "after " + "trying " + numTries + " times" + ".", e); + } + } else { + LOG.log(Level.INFO, "Giving up on task: " + name + " after trying " + + numTries + " times. " + "due to unhandled exception ", e); + throw Throwables.propagate(e); + } + } + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("name", name) + .append("tryNum", tryNum) + .append("numTries", numTries) + .append("retryDelay", retryDelay) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Sampler.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/Sampler.java b/commons/src/main/java/com/twitter/common/util/Sampler.java new file mode 100644 index 0000000..8aa26b7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/Sampler.java @@ -0,0 +1,57 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util; + +import com.google.common.base.Preconditions; + +/** + * A sampler that implements logic for fractional random selection. + * + * @author William Farner + */ +public class Sampler { + + private final Random rand; + private final double threshold; + + /** + * Creates a new sampler using the default system {@link Random}. + * + * @param selectPercent Percentage to randomly select, must be between 0 and 100 (inclusive). + */ + public Sampler(float selectPercent) { + this(selectPercent, Random.Util.newDefaultRandom()); + } + + /** + * Creates a new sampler using the provided {@link Random}. + * + * @param selectPercent Percentage to randoml select, must be between 0 and 100 (inclusive). + * @param rand The random utility to use for generating random numbers. + */ + public Sampler(float selectPercent, Random rand) { + Preconditions.checkArgument((selectPercent >= 0) && (selectPercent <= 100), + "Invalid selectPercent value: " + selectPercent); + + this.threshold = selectPercent / 100; + this.rand = Preconditions.checkNotNull(rand); + } + + public boolean select() { + return rand.nextDouble() < threshold; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/StartWatch.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/StartWatch.java b/commons/src/main/java/com/twitter/common/util/StartWatch.java new file mode 100644 index 0000000..db08a55 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/StartWatch.java @@ -0,0 +1,51 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +//************************************************************************ +// +// Summize +// +// This work protected by US Copyright Law and contains proprietary and +// confidential trade secrets. +// +// (c) Copyright 2007 Summize, ALL RIGHTS RESERVED. +// +//************************************************************************ + +package com.twitter.common.util; + +import org.apache.commons.lang.time.StopWatch; + +public class StartWatch extends StopWatch { + public StartWatch() { + super(); + } + + public void start() { + _started = true; + super.start(); + } + + public void resume() { + if (!_started) { + start(); + } else { + super.resume(); + } + } + + private boolean _started = false; +}
