http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java b/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java new file mode 100644 index 0000000..75a8449 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java @@ -0,0 +1,566 @@ +// ================================================================================================= +// Copyright 2013 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import java.util.Arrays; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Data; + +/** + * <p> + * Implements Histogram structure for computing approximate quantiles. + * The implementation is based on the following paper: + * + * <pre> + * [MP80] Munro & Paterson, "Selection and Sorting with Limited Storage", + * Theoretical Computer Science, Vol 12, p 315-323, 1980. + * </pre> + * </p> + * <p> + * You could read a detailed description of the same algorithm here: + * + * <pre> + * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other + * Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM + * SIGMOD, Vol 27, No 2, p 426-435, June 1998. + * </pre> + * </p> + * <p> + * There's a good explanation of the algorithm in the Sawzall source code + * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc + * </p> + * Here's a schema of the tree: + * <pre> + * [4] level 3, weight=rootWeight=8 + * | + * [3] level 2, weight=4 + * | + * [2] level 1, weight=2 + * / \ + * [0] [1] level 0, weight=1 + * </pre> + * <p> + * {@code [i]} represents {@code buffer[i]} + * The depth of the tree is limited to a maximum value + * Every buffer has the same size + * </p> + * <p> + * We add element in {@code [0]} or {@code [1]}. + * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer + * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise + * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and + * so on... + * </p> + */ +public final class ApproximateHistogram implements Histogram { + @VisibleForTesting + public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000); + @VisibleForTesting + public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB); + @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long + + // See above + @VisibleForTesting long[][] buffer; + @VisibleForTesting long count = 0L; + @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves + @VisibleForTesting int currentTop = 1; + @VisibleForTesting int[] indices; // member for optimization reason + private boolean leavesSorted = true; + private int rootWeight = 1; + private long[][] bufferPool; // pool of 2 buffers (used for merging) + private int bufferSize; + private int maxDepth; + + /** + * Private init method that is called only by constructors. + * All allocations are done in this method. + * + * @param bufSize size of each buffer + * @param depth maximum depth of the tree of buffers + */ + @VisibleForTesting + void init(int bufSize, int depth) { + bufferSize = bufSize; + maxDepth = depth; + bufferPool = new long[2][bufferSize]; + indices = new int[depth + 1]; + buffer = new long[depth + 1][bufferSize]; + // only allocate the first 2 buffers, lazily allocate the others. + allocate(0); + allocate(1); + Arrays.fill(buffer, 2, buffer.length, null); + clear(); + } + + @VisibleForTesting + ApproximateHistogram(int bufSize, int depth) { + init(bufSize, depth); + } + + /** + * Constructor with precision constraint, it will allocated as much memory as require to match + * this precision constraint. + * @param precision the requested precision + */ + public ApproximateHistogram(Precision precision) { + Preconditions.checkNotNull(precision); + int depth = computeDepth(precision.getEpsilon(), precision.getN()); + int bufSize = computeBufferSize(depth, precision.getN()); + init(bufSize, depth); + } + + /** + * Constructor with memory constraint, it will find the best possible precision that satisfied + * the memory constraint. + * @param maxMemory the maximum amount of memory that the instance will take + */ + public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) { + Preconditions.checkNotNull(maxMemory); + Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES), + "at least 1KB is required for an Histogram"); + + double epsilon = DEFAULT_PRECISION.getEpsilon(); + int n = expectedSize; + int depth = computeDepth(epsilon, n); + int bufSize = computeBufferSize(depth, n); + long maxBytes = maxMemory.as(Data.BYTES); + + // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps) + boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes; + double multiplier = tooMuchMem ? 1.05 : 0.95; + while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) { + epsilon *= multiplier; + if (epsilon < 0.00001) { + // for very high memory constraint increase N as well + n *= 10; + epsilon = DEFAULT_PRECISION.getEpsilon(); + } + depth = computeDepth(epsilon, n); + bufSize = computeBufferSize(depth, n); + } + if (!tooMuchMem) { + // It's ok to consume less memory than the constraint + // but we never have to consume more! + depth = computeDepth(epsilon / multiplier, n); + bufSize = computeBufferSize(depth, n); + } + + init(bufSize, depth); + } + + /** + * Constructor with memory constraint. + * @see #ApproximateHistogram(Amount, int) + */ + public ApproximateHistogram(Amount<Long, Data> maxMemory) { + this(maxMemory, DEFAULT_PRECISION.getN()); + } + + /** + * Default Constructor. + * @see #ApproximateHistogram(Amount) + */ + public ApproximateHistogram() { + this(DEFAULT_MAX_MEMORY); + } + + @Override + public synchronized void add(long x) { + // if the leaves of the tree are full, "collapse" recursively the tree + if (leafCount == 2 * bufferSize) { + Arrays.sort(buffer[0]); + Arrays.sort(buffer[1]); + recCollapse(buffer[0], 1); + leafCount = 0; + } + + // Now we're sure there is space for adding x + if (leafCount < bufferSize) { + buffer[0][leafCount] = x; + } else { + buffer[1][leafCount - bufferSize] = x; + } + leafCount++; + count++; + leavesSorted = (leafCount == 1); + } + + @Override + public synchronized long getQuantile(double q) { + Preconditions.checkArgument(0.0 <= q && q <= 1.0, + "quantile must be in the range 0.0 to 1.0 inclusive"); + if (count == 0) { + return 0L; + } + + // the two leaves are the only buffer that can be partially filled + int buf0Size = Math.min(bufferSize, leafCount); + int buf1Size = Math.max(0, leafCount - buf0Size); + long sum = 0; + long target = (long) Math.ceil(count * (1.0 - q)); + int i; + + if (! leavesSorted) { + Arrays.sort(buffer[0], 0, buf0Size); + Arrays.sort(buffer[1], 0, buf1Size); + leavesSorted = true; + } + Arrays.fill(indices, bufferSize - 1); + indices[0] = buf0Size - 1; + indices[1] = buf1Size - 1; + + do { + i = biggest(indices); + indices[i]--; + sum += weight(i); + } while (sum < target); + return buffer[i][indices[i] + 1]; + } + + @Override + public synchronized long[] getQuantiles(double[] quantiles) { + return Histograms.extractQuantiles(this, quantiles); + } + + @Override + public synchronized void clear() { + count = 0L; + leafCount = 0; + currentTop = 1; + rootWeight = 1; + leavesSorted = true; + } + + /** + * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the + * underlying histograms as it was just one. + * Note: Should only be used for querying the underlying histograms. + */ + private static class MergedHistogram implements Histogram { + private final ApproximateHistogram[] histograms; + + private MergedHistogram(ApproximateHistogram[] histograms) { + this.histograms = histograms; + } + + @Override + public void add(long x) { + /* Ignore, Shouldn't be used */ + assert(false); + } + + @Override + public void clear() { + /* Ignore, Shouldn't be used */ + assert(false); + } + + @Override + public long getQuantile(double quantile) { + Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0, + "quantile must be in the range 0.0 to 1.0 inclusive"); + + long count = initIndices(); + if (count == 0) { + return 0L; + } + + long sum = 0; + long target = (long) Math.ceil(count * (1.0 - quantile)); + int iHist = -1; + int iBiggest = -1; + do { + long biggest = Long.MIN_VALUE; + for (int i = 0; i < histograms.length; i++) { + ApproximateHistogram hist = histograms[i]; + int indexBiggest = hist.biggest(hist.indices); + if (indexBiggest >= 0) { + long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]]; + if (iBiggest == -1 || biggest <= value) { + iBiggest = indexBiggest; + biggest = value; + iHist = i; + } + } + } + histograms[iHist].indices[iBiggest]--; + sum += histograms[iHist].weight(iBiggest); + } while (sum < target); + + ApproximateHistogram hist = histograms[iHist]; + int i = hist.indices[iBiggest]; + return hist.buffer[iBiggest][i + 1]; + } + + @Override + public synchronized long[] getQuantiles(double[] quantiles) { + return Histograms.extractQuantiles(this, quantiles); + } + + /** + * Initialize the indices array for each Histogram and return the global count. + */ + private long initIndices() { + long count = 0L; + for (int i = 0; i < histograms.length; i++) { + ApproximateHistogram h = histograms[i]; + int[] indices = h.indices; + count += h.count; + int buf0Size = Math.min(h.bufferSize, h.leafCount); + int buf1Size = Math.max(0, h.leafCount - buf0Size); + + if (! h.leavesSorted) { + Arrays.sort(h.buffer[0], 0, buf0Size); + Arrays.sort(h.buffer[1], 0, buf1Size); + h.leavesSorted = true; + } + Arrays.fill(indices, h.bufferSize - 1); + indices[0] = buf0Size - 1; + indices[1] = buf1Size - 1; + } + return count; + } + } + + /** + * Return a MergedHistogram + * @param histograms array of histograms to merged together + * @return a new Histogram + */ + public static Histogram merge(ApproximateHistogram[] histograms) { + return new MergedHistogram(histograms); + } + + /** + * We compute the "smallest possible b" satisfying two inequalities: + * 1) (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N + * 2) k * (2 ^ (b - 1)) >= N + * + * For an explanation of these inequalities, please read the Munro-Paterson or + * the Manku-Rajagopalan-Linday papers. + */ + @VisibleForTesting static int computeDepth(double epsilon, long n) { + int b = 2; + while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) { + b += 1; + } + return b; + } + + @VisibleForTesting static int computeBufferSize(int depth, long n) { + return (int) (n / (1L << (depth - 1))); + } + + /** + * Return an estimation of the memory used by an instance. + * The size is due to: + * - a fix cost (76 bytes) for the class + fields + * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE) + * - indices: 16 + sizeof(Integer) * (depth + 1) + * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE) + * + * Note: This method is tested with unit test, it will break if you had new fields. + * @param bufferSize the size of a buffer + * @param depth the depth of the tree of buffer (depth + 1 buffers) + */ + @VisibleForTesting + static long memoryUsage(int bufferSize, int depth) { + return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3)); + } + + /** + * Return the level of the biggest element (using the indices array 'ids' + * to track which elements have been already returned). Every buffer has + * already been sorted at this point. + * @return the level of the biggest element or -1 if no element has been found + */ + @VisibleForTesting + int biggest(final int[] ids) { + long biggest = Long.MIN_VALUE; + final int id0 = ids[0], id1 = ids[1]; + int iBiggest = -1; + + if (0 < leafCount && 0 <= id0) { + biggest = buffer[0][id0]; + iBiggest = 0; + } + if (bufferSize < leafCount && 0 <= id1) { + long x = buffer[1][id1]; + if (x > biggest) { + biggest = x; + iBiggest = 1; + } + } + for (int i = 2; i < currentTop + 1; i++) { + if (!isBufferEmpty(i) && 0 <= ids[i]) { + long x = buffer[i][ids[i]]; + if (x > biggest) { + biggest = x; + iBiggest = i; + } + } + } + return iBiggest; + } + + + /** + * Based on the number of elements inserted we can easily know if a buffer + * is empty or not + */ + @VisibleForTesting + boolean isBufferEmpty(int level) { + if (level == currentTop) { + return false; // root buffer (if present) is always full + } else { + long levelWeight = 1 << (level - 1); + return (((count - leafCount) / bufferSize) & levelWeight) == 0; + } + } + + /** + * Return the weight of the level ie. 2^(i-1) except for the two tree + * leaves (weight=1) and for the root + */ + private int weight(int level) { + if (level == 0) { + return 1; + } else if (level == maxDepth) { + return rootWeight; + } else { + return 1 << (level - 1); + } + } + + private void allocate(int i) { + if (buffer[i] == null) { + buffer[i] = new long[bufferSize]; + } + } + + /** + * Recursively collapse the buffers of the tree. + * Upper buffers will be allocated on first access in this method. + */ + private void recCollapse(long[] buf, int level) { + // if we reach the root, we can't add more buffer + if (level == maxDepth) { + // weight() return the weight of the root, in that case we need the + // weight of merge result + int mergeWeight = 1 << (level - 1); + int idx = level % 2; + long[] merged = bufferPool[idx]; + long[] tmp = buffer[level]; + collapse(buf, mergeWeight, buffer[level], rootWeight, merged); + buffer[level] = merged; + bufferPool[idx] = tmp; + rootWeight += mergeWeight; + } else { + allocate(level + 1); // lazy allocation (if needed) + if (level == currentTop) { + // if we reach the top, add a new buffer + collapse1(buf, buffer[level], buffer[level + 1]); + currentTop += 1; + rootWeight *= 2; + } else if (isBufferEmpty(level + 1)) { + // if the upper buffer is empty, use it + collapse1(buf, buffer[level], buffer[level + 1]); + } else { + // it the upper buffer isn't empty, collapse with it + long[] merged = bufferPool[level % 2]; + collapse1(buf, buffer[level], merged); + recCollapse(merged, level + 1); + } + } + } + + /** + * collapse two sorted Arrays of different weight + * ex: [2,5,7] weight 2 and [3,8,9] weight 3 + * weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9] + * sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9] + * select every nth elems = [3,7,9] (n = sum weight / 2) + */ + @VisibleForTesting + static void collapse( + long[] left, + int leftWeight, + long[] right, + int rightWeight, + long[] output) { + + int totalWeight = leftWeight + rightWeight; + int halfTotalWeight = (totalWeight / 2) - 1; + int i = 0, j = 0, k = 0, cnt = 0; + + int weight; + long smallest; + + while (i < left.length || j < right.length) { + if (i < left.length && (j == right.length || left[i] < right[j])) { + smallest = left[i]; + weight = leftWeight; + i++; + } else { + smallest = right[j]; + weight = rightWeight; + j++; + } + + int cur = (cnt + halfTotalWeight) / totalWeight; + cnt += weight; + int next = (cnt + halfTotalWeight) / totalWeight; + + for(; cur < next; cur++) { + output[k] = smallest; + k++; + } + } + } + +/** + * Optimized version of collapse for collapsing two array of the same weight + * (which is what we want most of the time) + */ + private static void collapse1( + long[] left, + long[] right, + long[] output) { + + int i = 0, j = 0, k = 0, cnt = 0; + long smallest; + + while (i < left.length || j < right.length) { + if (i < left.length && (j == right.length || left[i] < right[j])) { + smallest = left[i]; + i++; + } else { + smallest = right[j]; + j++; + } + if (cnt % 2 == 1) { + output[k] = smallest; + k++; + } + cnt++; + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/CounterMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/CounterMap.java b/commons/src/main/java/com/twitter/common/stats/CounterMap.java new file mode 100644 index 0000000..fb4d7eb --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/CounterMap.java @@ -0,0 +1,141 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; + +/** + * A map from a key type to integers. This simplifies the process of storing counters for multiple + * values of the same type. + */ +public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable { + private final Map<K, Integer> map = Maps.newHashMap(); + + private static Logger log = Logger.getLogger(CounterMap.class.getName()); + + /** + * Increments the counter value associated with {@code key}, and returns the new value. + * + * @param key The key to increment + * @return The incremented value. + */ + public int incrementAndGet(K key) { + return incrementAndGet(key, 1); + } + + /** + * Increments the value associated with {@code key} by {@code value}, returning the new value. + * + * @param key The key to increment + * @return The incremented value. + */ + public int incrementAndGet(K key, int count) { + Integer value = map.get(key); + if (value == null) { + value = 0; + } + int newValue = count + value; + map.put(key, newValue); + return newValue; + } + + /** + * Gets the value associated with a key. + * + * @param key The key to look up. + * @return The counter value stored for {@code key}, or 0 if no mapping exists. + */ + public int get(K key) { + if (!map.containsKey(key)) { + return 0; + } + + return map.get(key); + } + + /** + * Assigns a value to a key. + * + * @param key The key to assign a value to. + * @param newValue The value to assign. + */ + public void set(K key, int newValue) { + Preconditions.checkNotNull(key); + map.put(key, newValue); + } + + /** + * Resets the value for {@code key}. This will remove the key from the counter. + * + * @param key The key to reset. + */ + public void reset(K key) { + map.remove(key); + } + + /** + * Gets the number of entries stored in the map. + * + * @return The size of the map. + */ + public int size() { + return map.size(); + } + + /** + * Gets an iterator for the mapped values. + * + * @return Iterator for mapped values. + */ + public Iterator<Map.Entry<K, Integer>> iterator() { + return map.entrySet().iterator(); + } + + public Collection<Integer> values() { + return map.values(); + } + + public Set<K> keySet() { + return map.keySet(); + } + + public String toString() { + StringBuilder strVal = new StringBuilder(); + for (Map.Entry<K, Integer> entry : this) { + strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n'); + } + return strVal.toString(); + } + + public Map<K, Integer> toMap() { + return map; + } + + @Override + public CounterMap<K> clone() { + CounterMap<K> newInstance = new CounterMap<K>(); + newInstance.map.putAll(map); + return newInstance; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java b/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java new file mode 100644 index 0000000..547a1ac --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java @@ -0,0 +1,92 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import java.util.Map; + +/** + * Same as CounterMap<K>, but also keeps track of the item with the highest count. + */ +public class CounterMapWithTopKey<K> extends CounterMap<K> { + + private K mostCommonKey = null; + + /** + * Updates the most common key, if needed. + * + * @param key The key to check. + * @param count The count for the key. + * @return The count. + */ + private int updateMostCommon(K key, int count) { + if (count > get(mostCommonKey)) { + mostCommonKey = key; + } + return count; + } + + /** + * Increments the counter value associated with {@code key}, and returns the new value. + * + * @param key The key to increment + * @return The incremented value. + */ + @Override + public int incrementAndGet(K key) { + return updateMostCommon(key, super.incrementAndGet(key)); + } + + /** + * Assigns a value to a key. + * + * @param key The key to assign a value to. + * @param newValue The value to assign. + */ + @Override + public void set(K key, int newValue) { + super.set(key, updateMostCommon(key, newValue)); + } + + /** + * Resets the value for {@code key}. This will simply set the stored value to 0. + * The most common key is updated by scanning the entire map. + * + * @param key The key to reset. + */ + @Override + public void reset(K key) { + super.reset(key); + for (Map.Entry<K, Integer> entry : this) { + updateMostCommon(entry.getKey(), entry.getValue()); + } + } + + /** + * + * @return The key with the highest count in the map. If multiple keys have this count, return + * an arbitrary one. + */ + public K getMostCommonKey() { + return mostCommonKey; + } + + @Override + public String toString() { + return new StringBuilder(super.toString()).append(String.format("Most common key: %s\n", + mostCommonKey.toString())).toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Elapsed.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Elapsed.java b/commons/src/main/java/com/twitter/common/stats/Elapsed.java new file mode 100644 index 0000000..dfe6c18 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Elapsed.java @@ -0,0 +1,70 @@ +package com.twitter.common.stats; + +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; + +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +/** + * A stat that exports the amount of time since it was last reset. + * + * @author William Farner + */ +public class Elapsed { + + private final Ticker ticker; + private final AtomicLong lastEventNs = new AtomicLong(); + + /** + * Calls {@link #Elapsed(String, Time)} using a default granularity of nanoseconds. + * + * @param name Name of the stat to export. + */ + public Elapsed(String name) { + this(name, Time.NANOSECONDS); + } + + /** + * Equivalent to calling {@link #Elapsed(String, Time, Ticker)} passing {@code name}, + * {@code granularity} and {@link com.google.common.base.Ticker#systemTicker()}. + * <br/> + * @param name Name of the stat to export. + * @param granularity Time unit granularity to export. + */ + public Elapsed(String name, Time granularity) { + this(name, granularity, Ticker.systemTicker()); + } + + /** + * Creates and exports a new stat that maintains the difference between the tick time + * and the time since it was last reset. Upon export, the counter will act as though it were just + * reset. + * <br/> + * @param name Name of stat to export + * @param granularity Time unit granularity to export. + * @param ticker Ticker implementation + */ + public Elapsed(String name, final Time granularity, final Ticker ticker) { + MorePreconditions.checkNotBlank(name); + Preconditions.checkNotNull(granularity); + this.ticker = Preconditions.checkNotNull(ticker); + + reset(); + + Stats.export(new StatImpl<Long>(name) { + @Override public Long read() { + return Amount.of(ticker.read() - lastEventNs.get(), Time.NANOSECONDS).as(granularity); + } + }); + } + + public void reset() { + lastEventNs.set(ticker.read()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Entropy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Entropy.java b/commons/src/main/java/com/twitter/common/stats/Entropy.java new file mode 100644 index 0000000..1ae9963 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Entropy.java @@ -0,0 +1,54 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.base.Preconditions; + +/** + * Calculate the entropy of a discrete distribution of <T>. + * + * @author Gilad Mishne + */ +public class Entropy<T> { + private final CounterMap<T> counts = new CounterMap<T>(); + private int total = 0; + + private static double Log2(double n) { + return Math.log(n) / Math.log(2); + } + + public Entropy(Iterable<T> elements) { + Preconditions.checkNotNull(elements); + for (T element : elements) { + counts.incrementAndGet(element); + total++; + } + } + + public double entropy() { + double entropy = 0; + for (int count: counts.values()) { + double prob = (double) count / total; + entropy -= prob * Log2(prob); + } + return entropy; + } + + public double perplexity() { + return Math.pow(2, entropy()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Histogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Histogram.java b/commons/src/main/java/com/twitter/common/stats/Histogram.java new file mode 100644 index 0000000..30e69d5 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Histogram.java @@ -0,0 +1,46 @@ +// ================================================================================================= +// Copyright 2013 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +/** + * An interface for Histogram + */ +public interface Histogram { + + /** + * Add an entry into the histogram. + * @param x the value to insert. + */ + void add(long x); + + /** + * Clear the histogram. + */ + void clear(); + + /** + * Return the current quantile of the histogram. + * @param quantile value to compute. + */ + long getQuantile(double quantile); + + /** + * Return the quantiles of the histogram. + * @param quantiles array of values to compute. + */ + long[] getQuantiles(double[] quantiles); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Histograms.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Histograms.java b/commons/src/main/java/com/twitter/common/stats/Histograms.java new file mode 100644 index 0000000..f45a1c7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Histograms.java @@ -0,0 +1,26 @@ +package com.twitter.common.stats; + +/** + * Helper class containing only static methods + */ +public final class Histograms { + + private Histograms() { + /* Disable */ + } + + /** + * Helper method that return an array of quantiles + * @param h the histogram to query + * @param quantiles an array of double representing the quantiles + * @return the array of computed quantiles + */ + public static long[] extractQuantiles(Histogram h, double[] quantiles) { + long[] results = new long[quantiles.length]; + for (int i = 0; i < results.length; i++) { + double q = quantiles[i]; + results[i] = h.getQuantile(q); + } + return results; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/JvmStats.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/JvmStats.java b/commons/src/main/java/com/twitter/common/stats/JvmStats.java new file mode 100644 index 0000000..610409b --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/JvmStats.java @@ -0,0 +1,243 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import java.lang.management.ClassLoadingMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.RuntimeMXBean; +import java.lang.management.ThreadMXBean; +import java.util.Map; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; + +import com.google.common.collect.Iterables; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Data; +import com.twitter.common.quantity.Time; + +/** + * Convenience class to export statistics about the JVM. + */ +public class JvmStats { + + private static final long BYTES_PER_MB = Amount.of(1L, Data.MB).as(Data.BYTES); + private static final double SECS_PER_NANO = + ((double) 1) / Amount.of(1L, Time.SECONDS).as(Time.NANOSECONDS); + + private JvmStats() { + // Utility class. + } + + /** + * Exports stats related to the JVM and runtime environment. + */ + public static void export() { + final OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean(); + if (osMbean instanceof com.sun.management.OperatingSystemMXBean) { + final com.sun.management.OperatingSystemMXBean sunOsMbean = + (com.sun.management.OperatingSystemMXBean) osMbean; + + Stats.exportAll( + ImmutableList.<Stat<? extends Number>>builder() + .add(new StatImpl<Long>("system_free_physical_memory_mb") { + @Override public Long read() { + return sunOsMbean.getFreePhysicalMemorySize() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("system_free_swap_mb") { + @Override public Long read() { + return sunOsMbean.getFreeSwapSpaceSize() / BYTES_PER_MB; + } + }) + .add( + Rate.of( + new StatImpl<Long>("process_cpu_time_nanos") { + @Override public Long read() { + return sunOsMbean.getProcessCpuTime(); + } + }).withName("process_cpu_cores_utilized").withScaleFactor(SECS_PER_NANO).build()) + .build()); + } + if (osMbean instanceof com.sun.management.UnixOperatingSystemMXBean) { + final com.sun.management.UnixOperatingSystemMXBean unixOsMbean = + (com.sun.management.UnixOperatingSystemMXBean) osMbean; + + Stats.exportAll(ImmutableList.<Stat<? extends Number>>builder() + .add(new StatImpl<Long>("process_max_fd_count") { + @Override public Long read() { return unixOsMbean.getMaxFileDescriptorCount(); } + }).add(new StatImpl<Long>("process_open_fd_count") { + @Override public Long read() { return unixOsMbean.getOpenFileDescriptorCount(); } + }).build()); + } + + final Runtime runtime = Runtime.getRuntime(); + final ClassLoadingMXBean classLoadingBean = ManagementFactory.getClassLoadingMXBean(); + final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); + final ThreadMXBean threads = ManagementFactory.getThreadMXBean(); + final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + + Stats.exportAll(ImmutableList.<Stat<? extends Number>>builder() + .add(new StatImpl<Long>("jvm_time_ms") { + @Override public Long read() { return System.currentTimeMillis(); } + }) + .add(new StatImpl<Integer>("jvm_available_processors") { + @Override public Integer read() { return runtime.availableProcessors(); } + }) + .add(new StatImpl<Long>("jvm_memory_free_mb") { + @Override public Long read() { return runtime.freeMemory() / BYTES_PER_MB; } + }) + .add(new StatImpl<Long>("jvm_memory_max_mb") { + @Override public Long read() { return runtime.maxMemory() / BYTES_PER_MB; } + }) + .add(new StatImpl<Long>("jvm_memory_mb_total") { + @Override public Long read() { return runtime.totalMemory() / BYTES_PER_MB; } + }) + .add(new StatImpl<Integer>("jvm_class_loaded_count") { + @Override public Integer read() { return classLoadingBean.getLoadedClassCount(); } + }) + .add(new StatImpl<Long>("jvm_class_total_loaded_count") { + @Override public Long read() { return classLoadingBean.getTotalLoadedClassCount(); } + }) + .add(new StatImpl<Long>("jvm_class_unloaded_count") { + @Override public Long read() { return classLoadingBean.getUnloadedClassCount(); } + }) + .add(new StatImpl<Long>("jvm_gc_collection_time_ms") { + @Override public Long read() { + long collectionTimeMs = 0; + for (GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) { + collectionTimeMs += bean.getCollectionTime(); + } + return collectionTimeMs; + } + }) + .add(new StatImpl<Long>("jvm_gc_collection_count") { + @Override public Long read() { + long collections = 0; + for (GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) { + collections += bean.getCollectionCount(); + } + return collections; + } + }) + .add(new StatImpl<Long>("jvm_memory_heap_mb_used") { + @Override public Long read() { + return memoryBean.getHeapMemoryUsage().getUsed() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("jvm_memory_heap_mb_committed") { + @Override public Long read() { + return memoryBean.getHeapMemoryUsage().getCommitted() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("jvm_memory_heap_mb_max") { + @Override public Long read() { + return memoryBean.getHeapMemoryUsage().getMax() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("jvm_memory_non_heap_mb_used") { + @Override public Long read() { + return memoryBean.getNonHeapMemoryUsage().getUsed() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("jvm_memory_non_heap_mb_committed") { + @Override public Long read() { + return memoryBean.getNonHeapMemoryUsage().getCommitted() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("jvm_memory_non_heap_mb_max") { + @Override public Long read() { + return memoryBean.getNonHeapMemoryUsage().getMax() / BYTES_PER_MB; + } + }) + .add(new StatImpl<Long>("jvm_uptime_secs") { + @Override public Long read() { return runtimeMXBean.getUptime() / 1000; } + }) + .add(new StatImpl<Double>("system_load_avg") { + @Override public Double read() { return osMbean.getSystemLoadAverage(); } + }) + .add(new StatImpl<Integer>("jvm_threads_peak") { + @Override public Integer read() { return threads.getPeakThreadCount(); } + }) + .add(new StatImpl<Long>("jvm_threads_started") { + @Override public Long read() { return threads.getTotalStartedThreadCount(); } + }) + .add(new StatImpl<Integer>("jvm_threads_daemon") { + @Override public Integer read() { return threads.getDaemonThreadCount(); } + }) + .add(new StatImpl<Integer>("jvm_threads_active") { + @Override public Integer read() { return threads.getThreadCount(); } + }) + .build()); + + // Export per memory pool gc time and cycle count like Ostrich + // This is based on code in Bridcage: https://cgit.twitter.biz/birdcage/tree/ \ + // ostrich/src/main/scala/com/twitter/ostrich/stats/StatsCollection.scala + Stats.exportAll(Iterables.transform(ManagementFactory.getGarbageCollectorMXBeans(), + new Function<GarbageCollectorMXBean, Stat<? extends Number>>(){ + @Override + public Stat<? extends Number> apply(final GarbageCollectorMXBean gcMXBean) { + return new StatImpl<Long>( + "jvm_gc_" + Stats.normalizeName(gcMXBean.getName()) + "_collection_count") { + @Override public Long read() { + return gcMXBean.getCollectionCount(); + } + }; + } + } + )); + + Stats.exportAll(Iterables.transform(ManagementFactory.getGarbageCollectorMXBeans(), + new Function<GarbageCollectorMXBean, Stat<? extends Number>>(){ + @Override + public Stat<? extends Number> apply(final GarbageCollectorMXBean gcMXBean) { + return new StatImpl<Long>( + "jvm_gc_" + Stats.normalizeName(gcMXBean.getName()) + "_collection_time_ms") { + @Override public Long read() { + return gcMXBean.getCollectionTime(); + } + }; + } + } + )); + + Stats.exportString( + new StatImpl<String>("jvm_input_arguments") { + @Override public String read() { + return runtimeMXBean.getInputArguments().toString(); + } + } + ); + + for (final String property : System.getProperties().stringPropertyNames()) { + Stats.exportString( + new StatImpl<String>("jvm_prop_" + Stats.normalizeName(property)) { + @Override public String read() { return System.getProperty(property); } + }); + } + + for (final Map.Entry<String, String> environmentVariable : System.getenv().entrySet()) { + Stats.exportString( + new StatImpl<String>("system_env_" + Stats.normalizeName(environmentVariable.getKey())) { + @Override public String read() { return environmentVariable.getValue(); } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/MovingAverage.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/MovingAverage.java b/commons/src/main/java/com/twitter/common/stats/MovingAverage.java new file mode 100644 index 0000000..7ab4302 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/MovingAverage.java @@ -0,0 +1,71 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import java.util.concurrent.LinkedBlockingDeque; + +import com.google.common.base.Preconditions; + +/** + * Function to compute the moving average of a time series. + * + * @author William Farner + */ +public class MovingAverage<T extends Number> extends SampledStat<Double> { + + private static final int DEFAULT_WINDOW = 10; + private final Stat<T> input; + + private final LinkedBlockingDeque<T> samples; + private double sampleSum = 0; + + private MovingAverage(String name, Stat<T> input, int windowSize) { + super(name, 0d); + Preconditions.checkArgument(windowSize > 1); + + this.input = Preconditions.checkNotNull(input); + this.samples = new LinkedBlockingDeque<T>(windowSize); + Stats.export(input); + } + + public static <T extends Number> MovingAverage<T> of(Stat<T> input) { + return MovingAverage.of(input, DEFAULT_WINDOW); + } + + public static <T extends Number> MovingAverage<T> of(Stat<T> input, int windowSize) { + return MovingAverage.of(String.format("%s_avg", input.getName()), input, windowSize); + } + + public static <T extends Number> MovingAverage<T> of(String name, Stat<T> input, + int windowSize) { + return new MovingAverage<T>(name, input, windowSize); + } + + @Override + public Double doSample() { + T sample = input.read(); + + if (samples.remainingCapacity() == 0) { + sampleSum -= samples.removeLast().doubleValue(); + } + + samples.addFirst(sample); + sampleSum += sample.doubleValue(); + + return sampleSum / samples.size(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java b/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java new file mode 100644 index 0000000..2319128 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java @@ -0,0 +1,100 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +import java.util.concurrent.LinkedBlockingDeque; + +import com.twitter.common.base.MorePreconditions; + + +/** + * Delta over the most recent k sample periods. + * + * If you use this class with a counter, you can get the cumulation of counts in a sliding window. + * + * One sample period is the time in between doSample() calls. + * + * @author Feng Zhuge + */ +public class MovingWindowDelta<T extends Number> extends SampledStat<Long> { + private static final int DEFAULT_WINDOW_SIZE = 60; + private final LinkedBlockingDeque<Long> deltaSeries; + private final Supplier<T> inputAccessor; + long sumDelta = 0l; + long lastInput = 0l; + + private MovingWindowDelta(String name, Supplier<T> inputAccessor, int windowSize) { + super(name, 0l); + + Preconditions.checkArgument(windowSize >= 1); + Preconditions.checkNotNull(inputAccessor); + MorePreconditions.checkNotBlank(name); + + deltaSeries = new LinkedBlockingDeque<Long>(windowSize); + this.inputAccessor = inputAccessor; + + Stats.export(this); + } + + /** + * Create a new MovingWindowDelta instance. + * + * @param name The name of the value to be tracked. + * @param inputAccessor The accessor of the value. + * @param windowSize How many sample periods shall we use to calculate delta. + * @param <T> The type of the value. + * @return The created MovingWindowSum instance. + */ + public static <T extends Number> MovingWindowDelta<T> of( + String name, Supplier<T> inputAccessor, int windowSize) { + return new MovingWindowDelta<T>(name, inputAccessor, windowSize); + } + + /** + * Create a new MovingWindowDelta instance using the default window size (currently 60). + * + * @param name The name of the value to be tracked. + * @param inputAccessor The accessor of the value. + * @param <T> The type of the value. + * @return The created MovingWindowSum instance. + */ + public static <T extends Number> MovingWindowDelta<T> of(String name, Supplier<T> inputAccessor) { + return of(name, inputAccessor, DEFAULT_WINDOW_SIZE); + } + + @Override + public Long doSample() { + long lastDelta = 0l; + if (deltaSeries.remainingCapacity() == 0) { + lastDelta = deltaSeries.removeFirst(); + } + + long newInput = inputAccessor.get().longValue(); + long newDelta = newInput - lastInput; + lastInput = newInput; + + deltaSeries.addLast(newDelta); + + sumDelta += newDelta - lastDelta; + + return sumDelta; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java b/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java new file mode 100644 index 0000000..c34d1ae --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java @@ -0,0 +1,128 @@ +// ================================================================================================= +// Copyright 2012 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.collect.Maps; + +import com.twitter.common.application.ShutdownRegistry; +import com.twitter.common.base.Closure; +import com.twitter.common.base.Command; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Stat exporter that extracts numeric {@link Stat}s from the {@link Stats} system, and exports them + * via a caller-defined sink. + * + * @author William Farner + */ +public class NumericStatExporter { + + private static final Logger LOG = Logger.getLogger(NumericStatExporter.class.getName()); + + private final ScheduledExecutorService executor; + private final Amount<Long, Time> exportInterval; + private final Closure<Map<String, ? extends Number>> exportSink; + + private final Runnable exporter; + + /** + * Creates a new numeric stat exporter that will export to the specified sink. + * + * @param exportSink Consumes stats. + * @param executor Executor to handle export thread. + * @param exportInterval Export period. + */ + public NumericStatExporter(final Closure<Map<String, ? extends Number>> exportSink, + ScheduledExecutorService executor, Amount<Long, Time> exportInterval) { + checkNotNull(exportSink); + this.executor = checkNotNull(executor); + this.exportInterval = checkNotNull(exportInterval); + this.exportSink = exportSink; + + exporter = new Runnable() { + @Override public void run() { + exportSink.execute(Maps.transformValues( + Maps.uniqueIndex(Stats.getNumericVariables(), GET_NAME), READ_STAT)); + } + }; + } + + /** + * Starts the stat exporter. + * + * @param shutdownRegistry Shutdown hook registry to allow the exporter to cleanly halt. + */ + public void start(ShutdownRegistry shutdownRegistry) { + long intervalSecs = exportInterval.as(Time.SECONDS); + executor.scheduleAtFixedRate(exporter, intervalSecs, intervalSecs, TimeUnit.SECONDS); + + shutdownRegistry.addAction(new Command() { + @Override public void execute() { + stop(); + exportSink.execute(Maps.transformValues( + Maps.uniqueIndex(Stats.getNumericVariables(), GET_NAME), SAMPLE_AND_READ_STAT)); + } + }); + } + + /** + * Stops the stat exporter. Once stopped, it may be started again by calling + * {@link #start(ShutdownRegistry)}. + */ + public void stop() { + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.severe("Failed to stop stat exporter."); + } + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + public static final Function<Stat<?>, String> GET_NAME = new Function<Stat<?>, String>() { + @Override public String apply(Stat<?> stat) { + return stat.getName(); + } + }; + + public static final Function<Stat<? extends Number>, Number> READ_STAT = + new Function<Stat<? extends Number>, Number>() { + @Override public Number apply(Stat<? extends Number> stat) { + return stat.read(); + } + }; + + private static final Function<RecordingStat<? extends Number>, Number> SAMPLE_AND_READ_STAT = + new Function<RecordingStat<? extends Number>, Number>() { + @Override public Number apply(RecordingStat<? extends Number> stat) { + return stat.sample(); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Percentile.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Percentile.java b/commons/src/main/java/com/twitter/common/stats/Percentile.java new file mode 100644 index 0000000..01b05df --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Percentile.java @@ -0,0 +1,201 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; + +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.util.Sampler; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; + +import javax.annotation.Nullable; + +/** + * A stats tracker to export percentiles of inputs based on a sampling rate. + * + * A percentile tracker will randomly sample recorded events with the given sampling rate, and + * will automatically register variables to track the percentiles requested. + * Percentiles are calculated based on the K most recent sampling windows, where each sampling + * window has the recorded events for a sampling period. + * + * @author William Farner + */ +public class Percentile<T extends Number & Comparable<T>> { + + @VisibleForTesting + static final int MAX_BUFFER_SIZE = 10001; + + private final Sampler sampler; + + private final Map<Double, SampledStat<Double>> statsByPercentile; + @VisibleForTesting + final LinkedList<T> samples = Lists.newLinkedList(); + + private final LinkedBlockingDeque<ArrayList<T>> sampleQueue; + private final ArrayList<T> allSamples = new ArrayList<T>(); + + /** + * Creates a new percentile tracker. + * + * @param name The name of the value whose percentile is being tracked. + * @param samplePercent The percent of events to sample [0, 100]. + * @param percentiles The percentiles to track. + */ + public Percentile(String name, float samplePercent, double... percentiles) { + this(name, new Sampler(samplePercent), percentiles); + } + + /** + * Creates a new percentile tracker. + * + * @param name The name of the value whose percentile is being tracked. + * @param sampler The sampler to use for selecting recorded events. + * @param percentiles The percentiles to track. + */ + public Percentile(String name, Sampler sampler, double... percentiles) { + this(name, 1, sampler, percentiles); + } + + /** + * Creates a new percentile tracker. + * + * A percentile tracker will randomly sample recorded events with the given sampling rate, and + * will automatically register variables to track the percentiles requested. + * When allowFlushAfterSample is set to true, once the last percentile is sampled, + * all recorded values are flushed in preparation for the next window; otherwise, the percentile + * is calculated using the moving window of the most recent values. + * + * @param name The name of the value whose percentile is being tracked. + * @param numSampleWindows How many sampling windows are used for calculation. + * @param sampler The sampler to use for selecting recorded events. You may set sampler to null + * to sample all input. + * @param percentiles The percentiles to track. + */ + public Percentile(String name, int numSampleWindows, + @Nullable Sampler sampler, double... percentiles) { + MorePreconditions.checkNotBlank(name); + Preconditions.checkArgument(numSampleWindows >= 1, "Must have one or more sample windows."); + Preconditions.checkNotNull(percentiles); + Preconditions.checkArgument(percentiles.length > 0, "Must specify at least one percentile."); + + this.sampler = sampler; + + sampleQueue = new LinkedBlockingDeque<ArrayList<T>>(numSampleWindows); + + ImmutableMap.Builder<Double, SampledStat<Double>> builder = + new ImmutableMap.Builder<Double, SampledStat<Double>>(); + + for (int i = 0; i < percentiles.length; i++) { + boolean sortFirst = i == 0; + String statName = String.format("%s_%s_percentile", name, percentiles[i]) + .replace('.', '_'); + + SampledStat<Double> stat = new PercentileVar(statName, percentiles[i], sortFirst); + Stats.export(stat); + builder.put(percentiles[i], stat); + } + + statsByPercentile = builder.build(); + } + + /** + * Get the variables associated with this percentile tracker. + * + * @return A map from tracked percentile to the Stat corresponding to it + */ + public Map<Double, ? extends Stat<?>> getPercentiles() { + return ImmutableMap.copyOf(statsByPercentile); + } + + @VisibleForTesting + SampledStat<Double> getPercentile(double percentile) { + return statsByPercentile.get(percentile); + } + + /** + * Records an event. + * + * @param value The value to record if it is randomly selected based on the sampling rate. + */ + public void record(T value) { + if (sampler == null || sampler.select()) { + synchronized (samples) { + samples.addLast(value); + + while (samples.size() > MAX_BUFFER_SIZE) samples.removeFirst(); + } + } + } + + private class PercentileVar extends SampledStat<Double> { + private final double percentile; + private final boolean sortFirst; + + PercentileVar(String name, double percentile, boolean sortFirst) { + super(name, 0d); + this.percentile = percentile; + this.sortFirst = sortFirst; + } + + @Override + public Double doSample() { + synchronized (samples) { + if (sortFirst) { + if (sampleQueue.remainingCapacity() == 0) { + sampleQueue.removeFirst(); + } + sampleQueue.addLast(new ArrayList<T>(samples)); + samples.clear(); + + allSamples.clear(); + for (ArrayList<T> sample : sampleQueue) { + allSamples.addAll(sample); + } + + Collections.sort(allSamples, Ordering.<T>natural()); + } + + if (allSamples.isEmpty()) { + return 0d; + } + + int maxIndex = allSamples.size() - 1; + double selectIndex = maxIndex * percentile / 100; + selectIndex = selectIndex < 0d ? 0d : selectIndex; + selectIndex = selectIndex > maxIndex ? maxIndex : selectIndex; + + int indexLeft = (int) selectIndex; + if (indexLeft == maxIndex) { + return allSamples.get(indexLeft).doubleValue(); + } + + double residue = selectIndex - indexLeft; + return allSamples.get(indexLeft).doubleValue() * (1 - residue) + + allSamples.get(indexLeft + 1).doubleValue() * residue; + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/PipelineStats.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/PipelineStats.java b/commons/src/main/java/com/twitter/common/stats/PipelineStats.java new file mode 100644 index 0000000..63fc09c --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/PipelineStats.java @@ -0,0 +1,137 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.collections.Pair; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Tracks the latency of different pipeline stages in a process. + * + * @author William Farner + */ +public class PipelineStats { + private static final String FULL_PIPELINE_NAME = "full"; + + private final Time precision; + private final Clock clock; + + private final Map<String, SlidingStats> stages; + + /** + * Creates a new pipeline tracker with the given pipeline name and stages. The stage name "full" + * is reserved to represent the duration of the entire pipeline. + * + * @param pipelineName Name of the pipeline. + * @param stages Stage names. + * @param precision Precision for time interval recording. + */ + public PipelineStats(String pipelineName, Set<String> stages, Time precision) { + this(pipelineName, stages, Clock.SYSTEM_CLOCK, precision); + } + + @VisibleForTesting + PipelineStats(String pipelineName, Set<String> stages, Clock clock, Time precision) { + MorePreconditions.checkNotBlank(pipelineName); + MorePreconditions.checkNotBlank(stages); + Preconditions.checkArgument(!stages.contains(FULL_PIPELINE_NAME)); + + this.clock = Preconditions.checkNotNull(clock); + this.precision = Preconditions.checkNotNull(precision); + + this.stages = Maps.newHashMap(); + for (String stage : stages) { + this.stages.put(stage, new SlidingStats( + String.format("%s_%s", pipelineName, stage), precision.toString())); + } + this.stages.put(FULL_PIPELINE_NAME, new SlidingStats( + String.format("%s_%s", pipelineName, FULL_PIPELINE_NAME), precision.toString())); + } + + private void record(Snapshot snapshot) { + for (Pair<String, Long> stage : snapshot.stages) { + stages.get(stage.getFirst()).accumulate(stage.getSecond()); + } + } + + public Snapshot newSnapshot() { + return new Snapshot(this); + } + + @VisibleForTesting + public SlidingStats getStatsForStage(String stage) { + return stages.get(stage); + } + + public class Snapshot { + private final List<Pair<String, Long>> stages = Lists.newLinkedList(); + private final PipelineStats parent; + + private String currentStage; + private long startTime; + private long ticker; + + private Snapshot(PipelineStats parent) { + this.parent = parent; + } + + /** + * Records the duration for the current pipeline stage, and advances to the next stage. The + * stage name must be one of the stages specified in the constructor. + * + * @param stageName Name of the stage to enter. + */ + public void start(String stageName) { + record(Preconditions.checkNotNull(stageName)); + } + + private void record(String stageName) { + long now = Amount.of(clock.nowNanos(), Time.NANOSECONDS).as(precision); + if (currentStage != null) { + stages.add(Pair.of(currentStage, now - ticker)); + } else { + startTime = now; + } + + if (stageName == null) stages.add(Pair.of(FULL_PIPELINE_NAME, now - startTime)); + + ticker = now; + currentStage = stageName; + } + + /** + * Stops the pipeline, recording the interval for the last registered stage. + * This is the same as calling {@link #start(String)} with {@code null}; + * + */ + public void end() { + record(null); + parent.record(this); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Precision.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Precision.java b/commons/src/main/java/com/twitter/common/stats/Precision.java new file mode 100644 index 0000000..2b088e7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Precision.java @@ -0,0 +1,37 @@ +package com.twitter.common.stats; + +import com.google.common.base.Preconditions; + +/** + * Precision expresses the maximum epsilon tolerated for a typical size of input + * e.g.: Precision(0.01, 1000) express that we tolerate a error of 1% for 1000 entries + * it means that max difference between the real quantile and the estimate one is + * error = 0.01*1000 = 10 + * For an entry like (1 to 1000), q(0.5) will be [490 <= x <= 510] (real q(0.5) = 500) + */ +public class Precision { + private final double epsilon; + private final int n; + + /** + * Create a Precision instance representing a precision per number of entries + * + * @param epsilon is the maximum error tolerated + * @param n size of the data set + */ + public Precision(double epsilon, int n) { + Preconditions.checkArgument(0.0 < epsilon, "Epsilon must be positive!"); + Preconditions.checkArgument(1 < n, "N (expected number of elements) must be greater than 1!"); + + this.epsilon = epsilon; + this.n = n; + } + + public double getEpsilon() { + return epsilon; + } + + public int getN() { + return n; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java b/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java new file mode 100644 index 0000000..a3ecebb --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java @@ -0,0 +1,93 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.base.Preconditions; + +public class PrintableHistogram { + private double[] bucketBoundaries; + private int[] bucketCounts; + private int totalCount = 0; + + /** + * Creates a histogram with the given bucket boundaries. The boundaries + * 0 and infinity are implicitly added. + * + * @param buckets Boundaries for histogram buckets. + */ + public PrintableHistogram(double ... buckets) { + Preconditions.checkState(buckets[0] != 0); + + bucketBoundaries = new double[buckets.length + 2]; + bucketBoundaries[0] = 0; + bucketCounts = new int[buckets.length + 2]; + for (int i = 0; i < buckets.length; i++) { + if (i > 0) { + Preconditions.checkState(buckets[i] > buckets[i - 1], + "Bucket %f must be greater than %f.", buckets[i], buckets[i - 1]); + } + bucketCounts[i] = 0; + bucketBoundaries[i + 1] = buckets[i]; + } + + bucketBoundaries[bucketBoundaries.length - 1] = Integer.MAX_VALUE; + } + + public void addValue(double value) { + addValue(value, 1); + } + + public void addValue(double value, int count) { + Preconditions.checkState(value >= 0); + Preconditions.checkState(count >= 0); + Preconditions.checkState(bucketBoundaries.length > 1); + int bucketId = -1; + for (double boundary : bucketBoundaries) { + if (value <= boundary) { + break; + } + bucketId++; + } + + bucketId = Math.max(0, bucketId); + bucketId = Math.min(bucketCounts.length - 1, bucketId); + bucketCounts[bucketId] += count; + totalCount += count; + } + + public double getBucketRatio(int bucketId) { + Preconditions.checkState(bucketId >= 0); + Preconditions.checkState(bucketId < bucketCounts.length); + return (double) bucketCounts[bucketId] / totalCount; + } + + public String toString() { + StringBuilder display = new StringBuilder(); + display.append("Histogram: "); + for (int bucketId = 0; bucketId < bucketCounts.length - 1; bucketId++) { + display.append(String.format("\n(%g - %g]\n\t", + bucketBoundaries[bucketId], bucketBoundaries[bucketId + 1])); + for (int i = 0; i < getBucketRatio(bucketId) * 100; i++) { + display.append('#'); + } + display.append( + String.format(" %.2g%% (%d)", getBucketRatio(bucketId) * 100, bucketCounts[bucketId])); + } + + return display.toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Rate.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Rate.java b/commons/src/main/java/com/twitter/common/stats/Rate.java new file mode 100644 index 0000000..f5df7e7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Rate.java @@ -0,0 +1,149 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Ticker; + +import com.twitter.common.collections.Pair; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Function to compute a windowed per-second rate of a value. + * + * @author William Farner + */ +public class Rate<T extends Number> extends SampledStat<Double> { + + private static final int DEFAULT_WINDOW_SIZE = 1; + private static final double DEFAULT_SCALE_FACTOR = 1; + private static final long NANOS_PER_SEC = Amount.of(1L, Time.SECONDS).as(Time.NANOSECONDS); + + private final Supplier<T> inputAccessor; + private final Ticker ticker; + private final double scaleFactor; + + private final LinkedBlockingDeque<Pair<Long, Double>> samples; + + private Rate(String name, Supplier<T> inputAccessor, int windowSize, double scaleFactor, + Ticker ticker) { + super(name, 0d); + + this.inputAccessor = Preconditions.checkNotNull(inputAccessor); + this.ticker = Preconditions.checkNotNull(ticker); + samples = new LinkedBlockingDeque<Pair<Long, Double>>(windowSize); + Preconditions.checkArgument(scaleFactor != 0, "Scale factor must be non-zero!"); + this.scaleFactor = scaleFactor; + } + + public static <T extends Number> Builder<T> of(Stat<T> input) { + return new Builder<T>(input); + } + + public static Builder<Long> of(String name, Supplier<Long> input) { + return new Builder<Long>(name, input); + } + + public static Builder<AtomicInteger> of(String name, AtomicInteger input) { + return new Builder<AtomicInteger>(name, input); + } + + public static Builder<AtomicLong> of(String name, AtomicLong input) { + return new Builder<AtomicLong>(name, input); + } + + @Override + public Double doSample() { + T newSample = inputAccessor.get(); + long newTimestamp = ticker.read(); + + double rate = 0; + if (!samples.isEmpty()) { + Pair<Long, Double> oldestSample = samples.peekLast(); + + double dy = newSample.doubleValue() - oldestSample.getSecond(); + double dt = newTimestamp - oldestSample.getFirst(); + rate = dt == 0 ? 0 : (NANOS_PER_SEC * scaleFactor * dy) / dt; + } + + if (samples.remainingCapacity() == 0) samples.removeLast(); + samples.addFirst(Pair.of(newTimestamp, newSample.doubleValue())); + + return rate; + } + + public static class Builder<T extends Number> { + + private String name; + private int windowSize = DEFAULT_WINDOW_SIZE; + private double scaleFactor = DEFAULT_SCALE_FACTOR; + private Supplier<T> inputAccessor; + private Ticker ticker = Ticker.systemTicker(); + + Builder(String name, final T input) { + this.name = name; + inputAccessor = Suppliers.ofInstance(input); + } + + Builder(String name, Supplier<T> input) { + this.name = name; + inputAccessor = input; + } + + Builder(final Stat<T> input) { + Stats.export(input); + this.name = input.getName() + "_per_sec"; + inputAccessor = new Supplier<T>() { + @Override public T get() { return input.read(); } + }; + } + + public Builder<T> withName(String name) { + this.name = name; + return this; + } + + public Builder<T> withWindowSize(int windowSize) { + this.windowSize = windowSize; + return this; + } + + public Builder<T> withScaleFactor(double scaleFactor) { + this.scaleFactor = scaleFactor; + return this; + } + + @VisibleForTesting + Builder<T> withTicker(Ticker ticker ) { + this.ticker = ticker; + return this; + } + + public Rate<T> build() { + return new Rate<T>(name, inputAccessor, windowSize, scaleFactor, ticker); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Ratio.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/Ratio.java b/commons/src/main/java/com/twitter/common/stats/Ratio.java new file mode 100644 index 0000000..7ac7842 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/Ratio.java @@ -0,0 +1,101 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +/** + * Function to compute the ratio of two time series. + * The first argument is the numerator, and the second is the denominator. Assumes that the + * timestamps of the two arguments are suitably synchronized (i.e. the ith point in the numerator + * time series corresponds with the ith point of the denominator time series). + * + * @author William Farner + */ +public class Ratio extends SampledStat<Double> { + + private final Supplier<Number> numeratorAccessor; + private final Supplier<Number> denominatorAccessor; + + private Ratio(String name, Supplier<Number> numeratorAccessor, + Supplier<Number> denominatorAccessor) { + super(name, 0d); + this.numeratorAccessor = Preconditions.checkNotNull(numeratorAccessor); + this.denominatorAccessor = Preconditions.checkNotNull(denominatorAccessor); + } + + public static <T extends Number> Ratio of(Stat<T> numerator, Stat<T> denominator) { + Preconditions.checkNotNull(numerator); + Preconditions.checkNotNull(denominator); + + String name = String.format("%s_per_%s", numerator.getName(), denominator.getName()); + return Ratio.of(name, numerator, denominator); + } + + public static <T extends Number> Ratio of(String name, final Stat<T> numerator, + final Stat<T> denominator) { + Preconditions.checkNotNull(numerator); + Preconditions.checkNotNull(denominator); + + Stats.export(numerator); + Stats.export(denominator); + + return new Ratio(name, + new Supplier<Number>() { + @Override public Number get() { + return numerator.read(); + } + }, + new Supplier<Number>() { + @Override public Number get() { + return denominator.read(); + } + }); + } + + public static Ratio of(String name, final Number numerator, final Number denominator) { + Preconditions.checkNotNull(numerator); + Preconditions.checkNotNull(denominator); + + return new Ratio(name, + new Supplier<Number>() { + @Override public Number get() { + return numerator; + } + }, + new Supplier<Number>() { + @Override public Number get() { + return denominator; + } + }); + } + + @Override + public Double doSample() { + double numeratorValue = numeratorAccessor.get().doubleValue(); + double denominatorValue = denominatorAccessor.get().doubleValue(); + + if ((denominatorValue == 0) + || (denominatorValue == Double.NaN) + || (numeratorValue == Double.NaN)) { + return 0d; + } + + return numeratorValue / denominatorValue; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/RecordingStat.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/stats/RecordingStat.java b/commons/src/main/java/com/twitter/common/stats/RecordingStat.java new file mode 100644 index 0000000..b65b396 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/stats/RecordingStat.java @@ -0,0 +1,36 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +/** + * A variable that contains information about a (possibly changing) value. + * + * @author William Farner + */ +interface RecordingStat<T extends Number> extends Stat<T> { + + /** + * Called by the variable sampler when a sample is being taken. Only calls to this method should + * be used to store variable history. + * + * Note - if the sampling of this value depends on other variables, it is imperative that those + * variables values are updated first (and available via {@link Stat#read()}. + * + * @return A new sample of the variable. + */ + public T sample(); +}
