http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Entropy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Entropy.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Entropy.java
new file mode 100644
index 0000000..4e011c1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Entropy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Calculate the entropy of a discrete distribution of <T>.
+ *
+ * @author Gilad Mishne
+ */
+public class Entropy<T> {
+  private final CounterMap<T> counts = new CounterMap<T>();
+  private int total = 0;
+
+  private static double Log2(double n) {
+    return Math.log(n) / Math.log(2);
+  }
+
+  public Entropy(Iterable<T> elements) {
+    Preconditions.checkNotNull(elements);
+    for (T element : elements) {
+      counts.incrementAndGet(element);
+      total++;
+    }
+  }
+
+  public double entropy() {
+    double entropy = 0;
+    for (int count: counts.values()) {
+      double prob = (double) count / total;
+      entropy -= prob * Log2(prob);
+    }
+    return entropy;
+  }
+
+  public double perplexity() {
+    return Math.pow(2, entropy());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Histogram.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/Histogram.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Histogram.java
new file mode 100644
index 0000000..53c976d
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Histogram.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * An interface for Histogram
+ */
+public interface Histogram {
+
+  /**
+   * Add an entry into the histogram.
+   * @param x the value to insert.
+   */
+  void add(long x);
+
+  /**
+   * Clear the histogram.
+   */
+  void clear();
+
+  /**
+   * Return the current quantile of the histogram.
+   * @param quantile value to compute.
+   */
+  long getQuantile(double quantile);
+
+  /**
+   * Return the quantiles of the histogram.
+   * @param quantiles array of values to compute.
+   */
+  long[] getQuantiles(double[] quantiles);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Histograms.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/Histograms.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Histograms.java
new file mode 100644
index 0000000..6818b05
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Histograms.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * Helper class containing only static methods
+ */
+public final class Histograms {
+
+  private Histograms() {
+    /* Disable */
+  }
+
+  /**
+   * Helper method that return an array of quantiles
+   * @param h the histogram to query
+   * @param quantiles an array of double representing the quantiles
+   * @return the array of computed quantiles
+   */
+  public static long[] extractQuantiles(Histogram h, double[] quantiles) {
+    long[] results = new long[quantiles.length];
+    for (int i = 0; i < results.length; i++) {
+      double q = quantiles[i];
+      results[i] = h.getQuantile(q);
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/JvmStats.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/JvmStats.java 
b/commons/src/main/java/org/apache/aurora/common/stats/JvmStats.java
new file mode 100644
index 0000000..03df9b6
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/JvmStats.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+import com.google.common.collect.Iterables;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * Convenience class to export statistics about the JVM.
+ */
+public class JvmStats {
+
+  private static final long BYTES_PER_MB = Amount.of(1L, 
Data.MB).as(Data.BYTES);
+  private static final double SECS_PER_NANO =
+      ((double) 1) / Amount.of(1L, Time.SECONDS).as(Time.NANOSECONDS);
+
+  private JvmStats() {
+    // Utility class.
+  }
+
+  /**
+   * Exports stats related to the JVM and runtime environment.
+   */
+  public static void export() {
+    final OperatingSystemMXBean osMbean = 
ManagementFactory.getOperatingSystemMXBean();
+    if (osMbean instanceof com.sun.management.OperatingSystemMXBean) {
+      final com.sun.management.OperatingSystemMXBean sunOsMbean =
+          (com.sun.management.OperatingSystemMXBean) osMbean;
+
+      Stats.exportAll(
+          ImmutableList.<Stat<? extends Number>>builder()
+          .add(new StatImpl<Long>("system_free_physical_memory_mb") {
+            @Override public Long read() {
+              return sunOsMbean.getFreePhysicalMemorySize() / BYTES_PER_MB;
+            }
+          })
+          .add(new StatImpl<Long>("system_free_swap_mb") {
+            @Override public Long read() {
+              return sunOsMbean.getFreeSwapSpaceSize() / BYTES_PER_MB;
+            }
+          })
+          .add(
+          Rate.of(
+              new StatImpl<Long>("process_cpu_time_nanos") {
+                @Override public Long read() {
+                  return sunOsMbean.getProcessCpuTime();
+                }
+              
}).withName("process_cpu_cores_utilized").withScaleFactor(SECS_PER_NANO).build())
+          .build());
+    }
+    if (osMbean instanceof com.sun.management.UnixOperatingSystemMXBean) {
+      final com.sun.management.UnixOperatingSystemMXBean unixOsMbean =
+          (com.sun.management.UnixOperatingSystemMXBean) osMbean;
+
+      Stats.exportAll(ImmutableList.<Stat<? extends Number>>builder()
+          .add(new StatImpl<Long>("process_max_fd_count") {
+            @Override public Long read() { return 
unixOsMbean.getMaxFileDescriptorCount(); }
+          }).add(new StatImpl<Long>("process_open_fd_count") {
+            @Override public Long read() { return 
unixOsMbean.getOpenFileDescriptorCount(); }
+          }).build());
+    }
+
+    final Runtime runtime = Runtime.getRuntime();
+    final ClassLoadingMXBean classLoadingBean = 
ManagementFactory.getClassLoadingMXBean();
+    final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
+    final ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+    final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+
+    Stats.exportAll(ImmutableList.<Stat<? extends Number>>builder()
+      .add(new StatImpl<Long>("jvm_time_ms") {
+        @Override public Long read() { return System.currentTimeMillis(); }
+      })
+      .add(new StatImpl<Integer>("jvm_available_processors") {
+        @Override public Integer read() { return 
runtime.availableProcessors(); }
+      })
+      .add(new StatImpl<Long>("jvm_memory_free_mb") {
+        @Override public Long read() { return runtime.freeMemory() / 
BYTES_PER_MB; }
+      })
+      .add(new StatImpl<Long>("jvm_memory_max_mb") {
+        @Override public Long read() { return runtime.maxMemory() / 
BYTES_PER_MB; }
+      })
+      .add(new StatImpl<Long>("jvm_memory_mb_total") {
+        @Override public Long read() { return runtime.totalMemory() / 
BYTES_PER_MB; }
+      })
+      .add(new StatImpl<Integer>("jvm_class_loaded_count") {
+        @Override public Integer read() { return 
classLoadingBean.getLoadedClassCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_class_total_loaded_count") {
+        @Override public Long read() { return 
classLoadingBean.getTotalLoadedClassCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_class_unloaded_count") {
+        @Override public Long read() { return 
classLoadingBean.getUnloadedClassCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_gc_collection_time_ms") {
+        @Override public Long read() {
+          long collectionTimeMs = 0;
+          for (GarbageCollectorMXBean bean : 
ManagementFactory.getGarbageCollectorMXBeans()) {
+            collectionTimeMs += bean.getCollectionTime();
+          }
+          return collectionTimeMs;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_gc_collection_count") {
+        @Override public Long read() {
+          long collections = 0;
+          for (GarbageCollectorMXBean bean : 
ManagementFactory.getGarbageCollectorMXBeans()) {
+            collections += bean.getCollectionCount();
+          }
+          return collections;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_heap_mb_used") {
+        @Override public Long read() {
+          return memoryBean.getHeapMemoryUsage().getUsed() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_heap_mb_committed") {
+        @Override public Long read() {
+          return memoryBean.getHeapMemoryUsage().getCommitted() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_heap_mb_max") {
+        @Override public Long read() {
+          return memoryBean.getHeapMemoryUsage().getMax() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_non_heap_mb_used") {
+        @Override public Long read() {
+          return memoryBean.getNonHeapMemoryUsage().getUsed() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_non_heap_mb_committed") {
+        @Override public Long read() {
+          return memoryBean.getNonHeapMemoryUsage().getCommitted() / 
BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_non_heap_mb_max") {
+        @Override public Long read() {
+          return memoryBean.getNonHeapMemoryUsage().getMax() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_uptime_secs") {
+        @Override public Long read() { return runtimeMXBean.getUptime() / 
1000; }
+      })
+      .add(new StatImpl<Double>("system_load_avg") {
+        @Override public Double read() { return 
osMbean.getSystemLoadAverage(); }
+      })
+      .add(new StatImpl<Integer>("jvm_threads_peak") {
+        @Override public Integer read() { return threads.getPeakThreadCount(); 
}
+      })
+      .add(new StatImpl<Long>("jvm_threads_started") {
+        @Override public Long read() { return 
threads.getTotalStartedThreadCount(); }
+      })
+      .add(new StatImpl<Integer>("jvm_threads_daemon") {
+        @Override public Integer read() { return 
threads.getDaemonThreadCount(); }
+      })
+      .add(new StatImpl<Integer>("jvm_threads_active") {
+        @Override public Integer read() { return threads.getThreadCount(); }
+      })
+      .build());
+
+    // Export per memory pool gc time and cycle count like Ostrich
+    // This is based on code in Bridcage: 
https://cgit.twitter.biz/birdcage/tree/ \
+    // ostrich/src/main/scala/com/twitter/ostrich/stats/StatsCollection.scala
+    
Stats.exportAll(Iterables.transform(ManagementFactory.getGarbageCollectorMXBeans(),
+        new Function<GarbageCollectorMXBean, Stat<? extends Number>>(){
+          @Override
+          public Stat<? extends Number> apply(final GarbageCollectorMXBean 
gcMXBean) {
+            return new StatImpl<Long>(
+                "jvm_gc_" + Stats.normalizeName(gcMXBean.getName()) + 
"_collection_count") {
+              @Override public Long read() {
+                return gcMXBean.getCollectionCount();
+              }
+            };
+          }
+        }
+    ));
+
+    
Stats.exportAll(Iterables.transform(ManagementFactory.getGarbageCollectorMXBeans(),
+        new Function<GarbageCollectorMXBean, Stat<? extends Number>>(){
+          @Override
+          public Stat<? extends Number> apply(final GarbageCollectorMXBean 
gcMXBean) {
+            return new StatImpl<Long>(
+                "jvm_gc_" + Stats.normalizeName(gcMXBean.getName()) + 
"_collection_time_ms") {
+              @Override public Long read() {
+                return gcMXBean.getCollectionTime();
+              }
+            };
+          }
+        }
+    ));
+
+    Stats.exportString(
+        new StatImpl<String>("jvm_input_arguments") {
+          @Override public String read() {
+            return runtimeMXBean.getInputArguments().toString();
+          }
+        }
+    );
+
+    for (final String property : System.getProperties().stringPropertyNames()) 
{
+      Stats.exportString(
+          new StatImpl<String>("jvm_prop_" + Stats.normalizeName(property)) {
+            @Override public String read() { return 
System.getProperty(property); }
+          });
+    }
+
+    for (final Map.Entry<String, String> environmentVariable : 
System.getenv().entrySet()) {
+      Stats.exportString(
+          new StatImpl<String>("system_env_" + 
Stats.normalizeName(environmentVariable.getKey())) {
+            @Override public String read() { return 
environmentVariable.getValue(); }
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/MovingAverage.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/MovingAverage.java 
b/commons/src/main/java/org/apache/aurora/common/stats/MovingAverage.java
new file mode 100644
index 0000000..339737b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/MovingAverage.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Function to compute the moving average of a time series.
+ *
+ * @author William Farner
+ */
+public class MovingAverage<T extends Number> extends SampledStat<Double> {
+
+  private static final int DEFAULT_WINDOW = 10;
+  private final Stat<T> input;
+
+  private final LinkedBlockingDeque<T> samples;
+  private double sampleSum = 0;
+
+  private MovingAverage(String name, Stat<T> input, int windowSize) {
+    super(name, 0d);
+    Preconditions.checkArgument(windowSize > 1);
+
+    this.input = Preconditions.checkNotNull(input);
+    this.samples = new LinkedBlockingDeque<T>(windowSize);
+    Stats.export(input);
+  }
+
+  public static <T extends Number> MovingAverage<T> of(Stat<T> input) {
+    return MovingAverage.of(input, DEFAULT_WINDOW);
+  }
+
+  public static <T extends Number> MovingAverage<T> of(Stat<T> input, int 
windowSize) {
+    return MovingAverage.of(String.format("%s_avg", input.getName()), input, 
windowSize);
+  }
+
+  public static <T extends Number> MovingAverage<T> of(String name, Stat<T> 
input,
+      int windowSize) {
+    return new MovingAverage<T>(name, input, windowSize);
+  }
+
+  @Override
+  public Double doSample() {
+    T sample = input.read();
+
+    if (samples.remainingCapacity() == 0) {
+      sampleSum -= samples.removeLast().doubleValue();
+    }
+
+    samples.addFirst(sample);
+    sampleSum += sample.doubleValue();
+
+    return sampleSum / samples.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/MovingWindowDelta.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/MovingWindowDelta.java 
b/commons/src/main/java/org/apache/aurora/common/stats/MovingWindowDelta.java
new file mode 100644
index 0000000..a20eed0
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/MovingWindowDelta.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.aurora.common.base.MorePreconditions;
+
+
+/**
+ * Delta over the most recent k sample periods.
+ *
+ * If you use this class with a counter, you can get the cumulation of counts 
in a sliding window.
+ *
+ * One sample period is the time in between doSample() calls.
+ *
+ * @author Feng Zhuge
+ */
+public class MovingWindowDelta<T extends Number> extends SampledStat<Long> {
+  private static final int DEFAULT_WINDOW_SIZE = 60;
+  private final LinkedBlockingDeque<Long> deltaSeries;
+  private final Supplier<T> inputAccessor;
+  long sumDelta = 0l;
+  long lastInput = 0l;
+
+  private MovingWindowDelta(String name, Supplier<T> inputAccessor, int 
windowSize) {
+    super(name, 0l);
+
+    Preconditions.checkArgument(windowSize >= 1);
+    Preconditions.checkNotNull(inputAccessor);
+    MorePreconditions.checkNotBlank(name);
+
+    deltaSeries = new LinkedBlockingDeque<Long>(windowSize);
+    this.inputAccessor = inputAccessor;
+
+    Stats.export(this);
+  }
+
+  /**
+   * Create a new MovingWindowDelta instance.
+   *
+   * @param name The name of the value to be tracked.
+   * @param inputAccessor The accessor of the value.
+   * @param windowSize How many sample periods shall we use to calculate delta.
+   * @param <T> The type of the value.
+   * @return The created MovingWindowSum instance.
+   */
+  public static <T extends Number> MovingWindowDelta<T> of(
+      String name, Supplier<T> inputAccessor, int windowSize) {
+    return new MovingWindowDelta<T>(name, inputAccessor, windowSize);
+  }
+
+  /**
+   * Create a new MovingWindowDelta instance using the default window size 
(currently 60).
+   *
+   * @param name The name of the value to be tracked.
+   * @param inputAccessor The accessor of the value.
+   * @param <T> The type of the value.
+   * @return The created MovingWindowSum instance.
+   */
+  public static <T extends Number> MovingWindowDelta<T> of(String name, 
Supplier<T> inputAccessor) {
+    return of(name, inputAccessor, DEFAULT_WINDOW_SIZE);
+  }
+
+  @Override
+  public Long doSample() {
+    long lastDelta = 0l;
+    if (deltaSeries.remainingCapacity() == 0) {
+      lastDelta = deltaSeries.removeFirst();
+    }
+
+    long newInput = inputAccessor.get().longValue();
+    long newDelta = newInput - lastInput;
+    lastInput = newInput;
+
+    deltaSeries.addLast(newDelta);
+
+    sumDelta += newDelta - lastDelta;
+
+    return sumDelta;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/NumericStatExporter.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/NumericStatExporter.java 
b/commons/src/main/java/org/apache/aurora/common/stats/NumericStatExporter.java
new file mode 100644
index 0000000..7345edc
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/NumericStatExporter.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Stat exporter that extracts numeric {@link Stat}s from the {@link Stats} 
system, and exports them
+ * via a caller-defined sink.
+ *
+ * @author William Farner
+ */
+public class NumericStatExporter {
+
+  private static final Logger LOG = 
Logger.getLogger(NumericStatExporter.class.getName());
+
+  private final ScheduledExecutorService executor;
+  private final Amount<Long, Time> exportInterval;
+  private final Closure<Map<String, ? extends Number>> exportSink;
+
+  private final Runnable exporter;
+
+  /**
+   * Creates a new numeric stat exporter that will export to the specified 
sink.
+   *
+   * @param exportSink Consumes stats.
+   * @param executor Executor to handle export thread.
+   * @param exportInterval Export period.
+   */
+  public NumericStatExporter(final Closure<Map<String, ? extends Number>> 
exportSink,
+      ScheduledExecutorService executor, Amount<Long, Time> exportInterval) {
+    checkNotNull(exportSink);
+    this.executor = checkNotNull(executor);
+    this.exportInterval = checkNotNull(exportInterval);
+    this.exportSink = exportSink;
+
+    exporter = new Runnable() {
+      @Override public void run() {
+          exportSink.execute(Maps.transformValues(
+            Maps.uniqueIndex(Stats.getNumericVariables(), GET_NAME), 
READ_STAT));
+      }
+    };
+  }
+
+  /**
+   * Starts the stat exporter.
+   *
+   * @param shutdownRegistry Shutdown hook registry to allow the exporter to 
cleanly halt.
+   */
+  public void start(ShutdownRegistry shutdownRegistry) {
+    long intervalSecs = exportInterval.as(Time.SECONDS);
+    executor.scheduleAtFixedRate(exporter, intervalSecs, intervalSecs, 
TimeUnit.SECONDS);
+
+    shutdownRegistry.addAction(new Command() {
+      @Override public void execute() {
+        stop();
+        exportSink.execute(Maps.transformValues(
+            Maps.uniqueIndex(Stats.getNumericVariables(), GET_NAME), 
SAMPLE_AND_READ_STAT));
+      }
+    });
+  }
+
+  /**
+   * Stops the stat exporter.  Once stopped, it may be started again by calling
+   * {@link #start(ShutdownRegistry)}.
+   */
+  public void stop() {
+    try {
+      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+        executor.shutdownNow();
+        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          LOG.severe("Failed to stop stat exporter.");
+        }
+      }
+    } catch (InterruptedException e) {
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public static final Function<Stat<?>, String> GET_NAME = new 
Function<Stat<?>, String>() {
+    @Override public String apply(Stat<?> stat) {
+      return stat.getName();
+    }
+  };
+
+  public static final Function<Stat<? extends Number>, Number> READ_STAT =
+      new Function<Stat<? extends Number>, Number>() {
+        @Override public Number apply(Stat<? extends Number> stat) {
+          return stat.read();
+        }
+      };
+
+  private static final Function<RecordingStat<? extends Number>, Number> 
SAMPLE_AND_READ_STAT =
+      new Function<RecordingStat<? extends Number>, Number>() {
+        @Override public Number apply(RecordingStat<? extends Number> stat) {
+          return stat.sample();
+        }
+      };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Percentile.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/Percentile.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Percentile.java
new file mode 100644
index 0000000..d8046b6
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Percentile.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.Sampler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import javax.annotation.Nullable;
+
+/**
+ * A stats tracker to export percentiles of inputs based on a sampling rate.
+ *
+ * A percentile tracker will randomly sample recorded events with the given 
sampling rate, and
+ * will automatically register variables to track the percentiles requested.
+ * Percentiles are calculated based on the K most recent sampling windows, 
where each sampling
+ * window has the recorded events for a sampling period.
+ *
+ * @author William Farner
+ */
+public class Percentile<T extends Number & Comparable<T>> {
+
+  @VisibleForTesting
+  static final int MAX_BUFFER_SIZE = 10001;
+
+  private final Sampler sampler;
+
+  private final Map<Double, SampledStat<Double>> statsByPercentile;
+  @VisibleForTesting
+  final LinkedList<T> samples = Lists.newLinkedList();
+
+  private final LinkedBlockingDeque<ArrayList<T>> sampleQueue;
+  private final ArrayList<T> allSamples = new ArrayList<T>();
+
+  /**
+   * Creates a new percentile tracker.
+   *
+   * @param name The name of the value whose percentile is being tracked.
+   * @param samplePercent The percent of events to sample [0, 100].
+   * @param percentiles The percentiles to track.
+   */
+  public Percentile(String name, float samplePercent, double... percentiles) {
+    this(name, new Sampler(samplePercent), percentiles);
+  }
+
+  /**
+   * Creates a new percentile tracker.
+   *
+   * @param name The name of the value whose percentile is being tracked.
+   * @param sampler The sampler to use for selecting recorded events.
+   * @param percentiles The percentiles to track.
+   */
+  public Percentile(String name, Sampler sampler, double... percentiles) {
+    this(name, 1, sampler, percentiles);
+  }
+
+  /**
+   * Creates a new percentile tracker.
+   *
+   * A percentile tracker will randomly sample recorded events with the given 
sampling rate, and
+   * will automatically register variables to track the percentiles requested.
+   * When allowFlushAfterSample is set to true, once the last percentile is 
sampled,
+   * all recorded values are flushed in preparation for the next window; 
otherwise, the percentile
+   * is calculated using the moving window of the most recent values.
+   *
+   * @param name The name of the value whose percentile is being tracked.
+   * @param numSampleWindows How many sampling windows are used for 
calculation.
+   * @param sampler The sampler to use for selecting recorded events. You may 
set sampler to null
+   *        to sample all input.
+   * @param percentiles The percentiles to track.
+   */
+  public Percentile(String name, int numSampleWindows,
+      @Nullable Sampler sampler, double... percentiles) {
+    MorePreconditions.checkNotBlank(name);
+    Preconditions.checkArgument(numSampleWindows >= 1, "Must have one or more 
sample windows.");
+    Preconditions.checkNotNull(percentiles);
+    Preconditions.checkArgument(percentiles.length > 0, "Must specify at least 
one percentile.");
+
+    this.sampler = sampler;
+
+    sampleQueue = new LinkedBlockingDeque<ArrayList<T>>(numSampleWindows);
+
+    ImmutableMap.Builder<Double, SampledStat<Double>> builder =
+        new ImmutableMap.Builder<Double, SampledStat<Double>>();
+
+    for (int i = 0; i < percentiles.length; i++) {
+      boolean sortFirst = i == 0;
+      String statName = String.format("%s_%s_percentile", name, percentiles[i])
+          .replace('.', '_');
+
+      SampledStat<Double> stat = new PercentileVar(statName, percentiles[i], 
sortFirst);
+      Stats.export(stat);
+      builder.put(percentiles[i], stat);
+    }
+
+    statsByPercentile = builder.build();
+  }
+
+  /**
+   * Get the variables associated with this percentile tracker.
+   *
+   * @return A map from tracked percentile to the Stat corresponding to it
+   */
+  public Map<Double, ? extends Stat<?>> getPercentiles() {
+    return ImmutableMap.copyOf(statsByPercentile);
+  }
+
+  @VisibleForTesting
+  SampledStat<Double> getPercentile(double percentile) {
+    return statsByPercentile.get(percentile);
+  }
+
+  /**
+   * Records an event.
+   *
+   * @param value The value to record if it is randomly selected based on the 
sampling rate.
+   */
+  public void record(T value) {
+    if (sampler == null || sampler.select()) {
+      synchronized (samples) {
+        samples.addLast(value);
+
+        while (samples.size() > MAX_BUFFER_SIZE) samples.removeFirst();
+      }
+    }
+  }
+
+  private class PercentileVar extends SampledStat<Double> {
+    private final double percentile;
+    private final boolean sortFirst;
+
+    PercentileVar(String name, double percentile, boolean sortFirst) {
+      super(name, 0d);
+      this.percentile = percentile;
+      this.sortFirst = sortFirst;
+    }
+
+    @Override
+    public Double doSample() {
+      synchronized (samples) {
+        if (sortFirst) {
+          if (sampleQueue.remainingCapacity() == 0) {
+            sampleQueue.removeFirst();
+          }
+          sampleQueue.addLast(new ArrayList<T>(samples));
+          samples.clear();
+
+          allSamples.clear();
+          for (ArrayList<T> sample : sampleQueue) {
+            allSamples.addAll(sample);
+          }
+
+          Collections.sort(allSamples, Ordering.<T>natural());
+        }
+
+        if (allSamples.isEmpty()) {
+          return 0d;
+        }
+
+        int maxIndex = allSamples.size() - 1;
+        double selectIndex = maxIndex * percentile / 100;
+        selectIndex = selectIndex < 0d ? 0d : selectIndex;
+        selectIndex = selectIndex > maxIndex ? maxIndex : selectIndex;
+
+        int indexLeft = (int) selectIndex;
+        if (indexLeft == maxIndex) {
+          return allSamples.get(indexLeft).doubleValue();
+        }
+
+        double residue = selectIndex - indexLeft;
+        return allSamples.get(indexLeft).doubleValue() * (1 - residue) +
+            allSamples.get(indexLeft + 1).doubleValue() * residue;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/PipelineStats.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/PipelineStats.java 
b/commons/src/main/java/org/apache/aurora/common/stats/PipelineStats.java
new file mode 100644
index 0000000..014a56a
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/PipelineStats.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks the latency of different pipeline stages in a process.
+ *
+ * @author William Farner
+ */
+public class PipelineStats {
+  private static final String FULL_PIPELINE_NAME = "full";
+
+  private final Time precision;
+  private final Clock clock;
+
+  private final Map<String, SlidingStats> stages;
+
+  /**
+   * Creates a new pipeline tracker with the given pipeline name and stages. 
The stage name "full"
+   * is reserved to represent the duration of the entire pipeline.
+   *
+   * @param pipelineName Name of the pipeline.
+   * @param stages Stage names.
+   * @param precision Precision for time interval recording.
+   */
+  public PipelineStats(String pipelineName, Set<String> stages, Time 
precision) {
+    this(pipelineName, stages, Clock.SYSTEM_CLOCK, precision);
+  }
+
+  @VisibleForTesting
+  PipelineStats(String pipelineName, Set<String> stages, Clock clock, Time 
precision) {
+    MorePreconditions.checkNotBlank(pipelineName);
+    MorePreconditions.checkNotBlank(stages);
+    Preconditions.checkArgument(!stages.contains(FULL_PIPELINE_NAME));
+
+    this.clock = Preconditions.checkNotNull(clock);
+    this.precision = Preconditions.checkNotNull(precision);
+
+    this.stages = Maps.newHashMap();
+    for (String stage : stages) {
+      this.stages.put(stage, new SlidingStats(
+          String.format("%s_%s", pipelineName, stage), precision.toString()));
+    }
+    this.stages.put(FULL_PIPELINE_NAME, new SlidingStats(
+        String.format("%s_%s", pipelineName, FULL_PIPELINE_NAME), 
precision.toString()));
+  }
+
+  private void record(Snapshot snapshot) {
+    for (Pair<String, Long> stage : snapshot.stages) {
+      stages.get(stage.getFirst()).accumulate(stage.getSecond());
+    }
+  }
+
+  public Snapshot newSnapshot() {
+    return new Snapshot(this);
+  }
+
+  @VisibleForTesting
+  public SlidingStats getStatsForStage(String stage) {
+    return stages.get(stage);
+  }
+
+  public class Snapshot {
+    private final List<Pair<String, Long>> stages = Lists.newLinkedList();
+    private final PipelineStats parent;
+
+    private String currentStage;
+    private long startTime;
+    private long ticker;
+
+    private Snapshot(PipelineStats parent) {
+      this.parent = parent;
+    }
+
+    /**
+     * Records the duration for the current pipeline stage, and advances to 
the next stage. The
+     * stage name must be one of the stages specified in the constructor.
+     *
+     * @param stageName Name of the stage to enter.
+     */
+    public void start(String stageName) {
+      record(Preconditions.checkNotNull(stageName));
+    }
+
+    private void record(String stageName) {
+      long now = Amount.of(clock.nowNanos(), Time.NANOSECONDS).as(precision);
+      if (currentStage != null) {
+        stages.add(Pair.of(currentStage, now - ticker));
+      } else {
+        startTime = now;
+      }
+
+      if (stageName == null) stages.add(Pair.of(FULL_PIPELINE_NAME, now - 
startTime));
+
+      ticker = now;
+      currentStage = stageName;
+    }
+
+    /**
+     * Stops the pipeline, recording the interval for the last registered 
stage.
+     * This is the same as calling {@link #start(String)} with {@code null};
+     *
+     */
+    public void end() {
+      record(null);
+      parent.record(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Precision.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/Precision.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Precision.java
new file mode 100644
index 0000000..7000e2f
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Precision.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Precision expresses the maximum epsilon tolerated for a typical size of 
input
+ * e.g.: Precision(0.01, 1000) express that we tolerate a error of 1% for 1000 
entries
+ *       it means that max difference between the real quantile and the 
estimate one is
+ *       error = 0.01*1000 = 10
+ *       For an entry like (1 to 1000), q(0.5) will be [490 <= x <= 510] (real 
q(0.5) = 500)
+ */
+public class Precision {
+  private final double epsilon;
+  private final int n;
+
+  /**
+   * Create a Precision instance representing a precision per number of entries
+   *
+   * @param epsilon is the maximum error tolerated
+   * @param n size of the data set
+   */
+  public Precision(double epsilon, int n) {
+    Preconditions.checkArgument(0.0 < epsilon, "Epsilon must be positive!");
+    Preconditions.checkArgument(1 < n, "N (expected number of elements) must 
be greater than 1!");
+
+    this.epsilon = epsilon;
+    this.n = n;
+  }
+
+  public double getEpsilon() {
+    return epsilon;
+  }
+
+  public int getN() {
+    return n;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/PrintableHistogram.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/PrintableHistogram.java 
b/commons/src/main/java/org/apache/aurora/common/stats/PrintableHistogram.java
new file mode 100644
index 0000000..a587457
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/PrintableHistogram.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+
+public class PrintableHistogram {
+  private double[] bucketBoundaries;
+  private int[] bucketCounts;
+  private int totalCount = 0;
+
+  /**
+   * Creates a histogram with the given bucket boundaries.  The boundaries
+   * 0 and infinity are implicitly added.
+   *
+   * @param buckets Boundaries for histogram buckets.
+   */
+  public PrintableHistogram(double ... buckets) {
+    Preconditions.checkState(buckets[0] != 0);
+
+    bucketBoundaries = new double[buckets.length + 2];
+    bucketBoundaries[0] = 0;
+    bucketCounts = new int[buckets.length + 2];
+    for (int i = 0; i < buckets.length; i++) {
+      if (i > 0) {
+        Preconditions.checkState(buckets[i] > buckets[i - 1],
+            "Bucket %f must be greater than %f.", buckets[i], buckets[i - 1]);
+      }
+      bucketCounts[i] = 0;
+      bucketBoundaries[i + 1] = buckets[i];
+    }
+
+    bucketBoundaries[bucketBoundaries.length - 1] = Integer.MAX_VALUE;
+  }
+
+  public void addValue(double value) {
+    addValue(value, 1);
+  }
+
+  public void addValue(double value, int count) {
+    Preconditions.checkState(value >= 0);
+    Preconditions.checkState(count >= 0);
+    Preconditions.checkState(bucketBoundaries.length > 1);
+    int bucketId = -1;
+    for (double boundary : bucketBoundaries) {
+      if (value <= boundary) {
+        break;
+      }
+      bucketId++;
+    }
+
+    bucketId = Math.max(0, bucketId);
+    bucketId = Math.min(bucketCounts.length - 1, bucketId);
+    bucketCounts[bucketId] += count;
+    totalCount += count;
+  }
+
+  public double getBucketRatio(int bucketId) {
+    Preconditions.checkState(bucketId >= 0);
+    Preconditions.checkState(bucketId < bucketCounts.length);
+    return (double) bucketCounts[bucketId] / totalCount;
+  }
+
+  public String toString() {
+    StringBuilder display = new StringBuilder();
+    display.append("Histogram: ");
+    for (int bucketId = 0; bucketId < bucketCounts.length - 1; bucketId++) {
+      display.append(String.format("\n(%g - %g]\n\t",
+          bucketBoundaries[bucketId], bucketBoundaries[bucketId + 1]));
+      for (int i = 0; i < getBucketRatio(bucketId) * 100; i++) {
+        display.append('#');
+      }
+      display.append(
+          String.format(" %.2g%% (%d)", getBucketRatio(bucketId) * 100, 
bucketCounts[bucketId]));
+    }
+
+    return display.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Rate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Rate.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Rate.java
new file mode 100644
index 0000000..ff29c39
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Rate.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Ticker;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Function to compute a windowed per-second rate of a value.
+ *
+ * @author William Farner
+ */
+public class Rate<T extends Number> extends SampledStat<Double> {
+
+  private static final int DEFAULT_WINDOW_SIZE = 1;
+  private static final double DEFAULT_SCALE_FACTOR = 1;
+  private static final long NANOS_PER_SEC = Amount.of(1L, 
Time.SECONDS).as(Time.NANOSECONDS);
+
+  private final Supplier<T> inputAccessor;
+  private final Ticker ticker;
+  private final double scaleFactor;
+
+  private final LinkedBlockingDeque<Pair<Long, Double>> samples;
+
+  private Rate(String name, Supplier<T> inputAccessor, int windowSize, double 
scaleFactor,
+      Ticker ticker) {
+    super(name, 0d);
+
+    this.inputAccessor = Preconditions.checkNotNull(inputAccessor);
+    this.ticker = Preconditions.checkNotNull(ticker);
+    samples = new LinkedBlockingDeque<Pair<Long, Double>>(windowSize);
+    Preconditions.checkArgument(scaleFactor != 0, "Scale factor must be 
non-zero!");
+    this.scaleFactor = scaleFactor;
+  }
+
+  public static <T extends Number> Builder<T> of(Stat<T> input) {
+    return new Builder<T>(input);
+  }
+
+  public static Builder<Long> of(String name, Supplier<Long> input) {
+    return new Builder<Long>(name, input);
+  }
+
+  public static Builder<AtomicInteger> of(String name, AtomicInteger input) {
+    return new Builder<AtomicInteger>(name, input);
+  }
+
+  public static Builder<AtomicLong> of(String name, AtomicLong input) {
+    return new Builder<AtomicLong>(name, input);
+  }
+
+  @Override
+  public Double doSample() {
+    T newSample = inputAccessor.get();
+    long newTimestamp = ticker.read();
+
+    double rate = 0;
+    if (!samples.isEmpty()) {
+      Pair<Long, Double> oldestSample = samples.peekLast();
+
+      double dy = newSample.doubleValue() - oldestSample.getSecond();
+      double dt = newTimestamp - oldestSample.getFirst();
+      rate = dt == 0 ? 0 : (NANOS_PER_SEC * scaleFactor * dy) / dt;
+    }
+
+    if (samples.remainingCapacity() == 0) samples.removeLast();
+    samples.addFirst(Pair.of(newTimestamp, newSample.doubleValue()));
+
+    return rate;
+  }
+
+  public static class Builder<T extends Number> {
+
+    private String name;
+    private int windowSize = DEFAULT_WINDOW_SIZE;
+    private double scaleFactor = DEFAULT_SCALE_FACTOR;
+    private Supplier<T> inputAccessor;
+    private Ticker ticker = Ticker.systemTicker();
+
+    Builder(String name, final T input) {
+      this.name = name;
+      inputAccessor = Suppliers.ofInstance(input);
+    }
+
+    Builder(String name, Supplier<T> input) {
+      this.name = name;
+      inputAccessor = input;
+    }
+
+    Builder(final Stat<T> input) {
+      Stats.export(input);
+      this.name = input.getName() + "_per_sec";
+      inputAccessor = new Supplier<T>() {
+        @Override public T get() { return input.read(); }
+      };
+    }
+
+    public Builder<T> withName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder<T> withWindowSize(int windowSize) {
+      this.windowSize = windowSize;
+      return this;
+    }
+
+    public Builder<T> withScaleFactor(double scaleFactor) {
+      this.scaleFactor = scaleFactor;
+      return this;
+    }
+
+    @VisibleForTesting
+    Builder<T> withTicker(Ticker ticker ) {
+      this.ticker = ticker;
+      return this;
+    }
+
+    public Rate<T> build() {
+      return new Rate<T>(name, inputAccessor, windowSize, scaleFactor, ticker);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Ratio.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Ratio.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Ratio.java
new file mode 100644
index 0000000..c332dd3
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Ratio.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+/**
+ * Function to compute the ratio of two time series.
+ * The first argument is the numerator, and the second is the denominator.  
Assumes that the
+ * timestamps of the two arguments are suitably synchronized (i.e. the ith 
point in the numerator
+ * time series corresponds with the ith point of the denominator time series).
+ *
+ * @author William Farner
+ */
+public class Ratio extends SampledStat<Double> {
+
+  private final Supplier<Number> numeratorAccessor;
+  private final Supplier<Number> denominatorAccessor;
+
+  private Ratio(String name, Supplier<Number> numeratorAccessor,
+      Supplier<Number> denominatorAccessor) {
+    super(name, 0d);
+    this.numeratorAccessor = Preconditions.checkNotNull(numeratorAccessor);
+    this.denominatorAccessor = Preconditions.checkNotNull(denominatorAccessor);
+  }
+
+  public static <T extends Number> Ratio of(Stat<T> numerator, Stat<T> 
denominator) {
+    Preconditions.checkNotNull(numerator);
+    Preconditions.checkNotNull(denominator);
+
+    String name = String.format("%s_per_%s", numerator.getName(), 
denominator.getName());
+    return Ratio.of(name, numerator, denominator);
+  }
+
+  public static <T extends Number> Ratio of(String name, final Stat<T> 
numerator,
+      final Stat<T> denominator) {
+    Preconditions.checkNotNull(numerator);
+    Preconditions.checkNotNull(denominator);
+
+    Stats.export(numerator);
+    Stats.export(denominator);
+
+    return new Ratio(name,
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return numerator.read();
+          }
+        },
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return denominator.read();
+          }
+        });
+  }
+
+  public static Ratio of(String name, final Number numerator, final Number 
denominator) {
+    Preconditions.checkNotNull(numerator);
+    Preconditions.checkNotNull(denominator);
+
+    return new Ratio(name,
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return numerator;
+          }
+        },
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return denominator;
+          }
+        });
+  }
+
+  @Override
+  public Double doSample() {
+    double numeratorValue = numeratorAccessor.get().doubleValue();
+    double denominatorValue = denominatorAccessor.get().doubleValue();
+
+    if ((denominatorValue == 0)
+        || (denominatorValue == Double.NaN)
+        || (numeratorValue == Double.NaN)) {
+      return 0d;
+    }
+
+    return numeratorValue / denominatorValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/RecordingStat.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/RecordingStat.java 
b/commons/src/main/java/org/apache/aurora/common/stats/RecordingStat.java
new file mode 100644
index 0000000..a3fad3f
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/RecordingStat.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * A variable that contains information about a (possibly changing) value.
+ *
+ * @author William Farner
+ */
+interface RecordingStat<T extends Number> extends Stat<T> {
+
+  /**
+   * Called by the variable sampler when a sample is being taken.  Only calls 
to this method should
+   * be used to store variable history.
+   *
+   * Note - if the sampling of this value depends on other variables, it is 
imperative that those
+   * variables values are updated first (and available via {@link Stat#read()}.
+   *
+   * @return A new sample of the variable.
+   */
+  public T sample();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/RecordingStatImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/RecordingStatImpl.java 
b/commons/src/main/java/org/apache/aurora/common/stats/RecordingStatImpl.java
new file mode 100644
index 0000000..b76c14a
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/RecordingStatImpl.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A convenience class to wrap a {@link RecordingStat}.
+ *
+ * @author William Farner
+ */
+class RecordingStatImpl<T extends Number> implements RecordingStat<T> {
+  private final Stat<T> recorded;
+  private final String name;
+
+  public RecordingStatImpl(Stat<T> recorded) {
+    this.recorded = Preconditions.checkNotNull(recorded);
+    this.name = Stats.validateName(recorded.getName());
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public T sample() {
+    return read();
+  }
+
+  @Override
+  public T read() {
+    return recorded.read();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/RequestStats.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/RequestStats.java 
b/commons/src/main/java/org/apache/aurora/common/stats/RequestStats.java
new file mode 100644
index 0000000..5467810
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/RequestStats.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A class to represent the statistics associated with a client connection to 
an external service.
+ * Tracks request latency/rate, and error rate.
+ *
+ * @author William Farner
+ */
+public class RequestStats implements StatsProvider.RequestTimer {
+
+  private static final float DEFAULT_SAMPLE_PERCENT = 10;
+  private static final double[] DEFAULT_PERCENTILES = {10, 50, 90, 99, 99.9, 
99.99};
+
+  private final SlidingStats requests;
+  private final Percentile<Long> percentile;
+
+  private final AtomicLong errors;
+  private final AtomicLong reconnects;
+  private final AtomicLong timeouts;
+
+  /**
+   * Creates a new request statistics object, using the default percentiles 
and sampling rate.
+   *
+   * @param name The unique name for this request type.
+   */
+  public RequestStats(String name) {
+    this(name, new Percentile<Long>(name, DEFAULT_SAMPLE_PERCENT, 
DEFAULT_PERCENTILES));
+  }
+
+  /**
+   * Creates a new request statistics object using a custom percentile tracker.
+   *
+   * @param name The unique name for this request type.
+   * @param percentile The percentile tracker, or {@code null} to disable 
percentile tracking.
+   */
+  public RequestStats(String name, @Nullable Percentile<Long> percentile) {
+    requests = new SlidingStats(name + "_requests", "micros");
+    this.percentile = percentile;
+    errors = Stats.exportLong(name + "_errors");
+    reconnects = Stats.exportLong(name + "_reconnects");
+    timeouts = Stats.exportLong(name + "_timeouts");
+    Rate<AtomicLong> requestsPerSec =
+        Rate.of(name + "_requests_per_sec", 
requests.getEventCounter()).build();
+    Stats.export(Ratio.of(name + "_error_rate",
+        Rate.of(name + "_errors_per_sec", errors).build(), requestsPerSec));
+    Rate<AtomicLong> timeoutsPerSec = Rate.of(name + "_timeouts_per_sec", 
timeouts).build();
+    Stats.export(timeoutsPerSec);
+    Stats.export(Ratio.of(name + "_timeout_rate", timeoutsPerSec, 
requestsPerSec));
+  }
+
+  public SlidingStats getSlidingStats() {
+    return requests;
+  }
+
+  public AtomicLong getErrorCounter() {
+    return errors;
+  }
+
+  public AtomicLong getReconnectCounter() {
+    return reconnects;
+  }
+
+  public AtomicLong getTimeoutCounter() {
+    return timeouts;
+  }
+
+  public Percentile<Long> getPercentile() {
+    return percentile;
+  }
+
+  /**
+   * Accumulates a request and its latency.
+   *
+   * @param latencyMicros The elapsed time required to complete the request.
+   */
+  public void requestComplete(long latencyMicros) {
+    requests.accumulate(latencyMicros);
+    if (percentile != null) percentile.record(latencyMicros);
+  }
+
+  /**
+   * Accumulates the error counter and the request counter.
+   */
+  public void incErrors() {
+    requestComplete(0);
+    errors.incrementAndGet();
+  }
+
+  /**
+   * Accumulates the error counter, the request counter and the request 
latency.
+   *
+   * @param latencyMicros The elapsed time before the error occurred.
+   */
+  public void incErrors(long latencyMicros) {
+    requestComplete(latencyMicros);
+    errors.incrementAndGet();
+  }
+
+  /**
+   * Accumulates the reconnect counter.
+   */
+  public void incReconnects() {
+    reconnects.incrementAndGet();
+  }
+
+  /**
+   * Accumulates the timtout counter.
+   */
+  public void incTimeouts() {
+    timeouts.incrementAndGet();
+  }
+
+  public long getErrorCount() {
+    return errors.get();
+  }
+
+  public long getReconnectCount() {
+    return reconnects.get();
+  }
+
+  public long getTimeoutCount() {
+    return timeouts.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/ReservoirSampler.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/ReservoirSampler.java 
b/commons/src/main/java/org/apache/aurora/common/stats/ReservoirSampler.java
new file mode 100644
index 0000000..79e7e04
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/ReservoirSampler.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Vector;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.util.Random;
+
+/**
+ * An in memory implementation of Reservoir Sampling for sampling from
+ * a population.
+ * <p>Several optimizations can be done.
+ * Especially, one can avoid rolling the dice as many times as the
+ * size of the population with an involved trick.
+ * See "Random Sampling with a Reservoir", Vitter, 1985</p>
+ * <p>TODO (delip): Fix this when the problem arises</p>
+ *
+ * @param <T> Type of the sample
+ * @author Delip Rao
+ */
+public class ReservoirSampler<T> {
+  private final Vector<T> reservoir = new Vector<T>();
+  private final int numSamples;
+
+  private final Random random;
+  private int numItemsSeen = 0;
+
+  /**
+   * Create a new sampler with a certain reservoir size using
+   * a supplied random number generator.
+   *
+   * @param numSamples Maximum number of samples to
+   *                   retain in the reservoir. Must be non-negative.
+   * @param random Instance of the random number generator
+   *               to use for sampling
+   */
+  public ReservoirSampler(int numSamples, Random random) {
+    Preconditions.checkArgument(numSamples > 0,
+        "numSamples should be positive");
+    Preconditions.checkNotNull(random);
+    this.numSamples = numSamples;
+    this.random = random;
+  }
+
+  /**
+   * Create a new sampler with a certain reservoir size using
+   * the default random number generator.
+   *
+   * @param numSamples Maximum number of samples to
+   *        retain in the reservoir. Must be non-negative.
+   */
+  public ReservoirSampler(int numSamples) {
+    this(numSamples, Random.Util.newDefaultRandom());
+  }
+
+  /**
+   * Sample an item and store in the reservoir if needed.
+   *
+   * @param item The item to sample - may not be null.
+   */
+  public void sample(T item) {
+    Preconditions.checkNotNull(item);
+    if (reservoir.size() < numSamples) {
+      // reservoir not yet full, just append
+      reservoir.add(item);
+    } else {
+      // find a sample to replace
+      int rIndex = random.nextInt(numItemsSeen + 1);
+      if (rIndex < numSamples) {
+        reservoir.set(rIndex, item);
+      }
+    }
+    numItemsSeen++;
+  }
+
+  /**
+   * Get samples collected in the reservoir.
+   *
+   * @return A sequence of the samples. No guarantee is provided on the order 
of the samples.
+   */
+  public Iterable<T> getSamples() {
+    return reservoir;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/SampledStat.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/SampledStat.java 
b/commons/src/main/java/org/apache/aurora/common/stats/SampledStat.java
new file mode 100644
index 0000000..f1af71e
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/SampledStat.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * A convenience class to perform the basic tasks needed for a {@link 
RecordingStat} except the
+ * actual value calculation.
+ *
+ * @author William Farner
+ */
+public abstract class SampledStat<T extends Number> extends StatImpl<T> 
implements RecordingStat<T> {
+
+  private volatile T prevValue;
+
+  public SampledStat(String name, T defaultValue) {
+    super(name);
+    this.prevValue = defaultValue; /* Don't forbid null. */
+  }
+
+  public abstract T doSample();
+
+  @Override
+  public final T sample() {
+    prevValue = doSample();
+    return prevValue;
+  }
+
+  @Override
+  public T read() {
+    return prevValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Significance.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/Significance.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Significance.java
new file mode 100644
index 0000000..ff040e9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Significance.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * Calculate significance scores between an observed amount and an expected 
amount.
+ *
+ * @author Gilad Mishne
+ */
+public class Significance {
+
+  /**
+   * @param observed The observed amount.
+   * @param expected The expected amount.
+   * @return [(observed - expected) ** 2 / expected] * sign(observed - 
expected)
+   */
+  public static double chiSqrScore(double observed, double expected) {
+    double score = Math.pow((observed - expected), 2) / expected;
+    if (observed < expected) {
+      score *= -1;
+    }
+    return score;
+  }
+
+  /**
+   * @param observed The observed amount.
+   * @param expected The expected amount.
+   * @return -2 * expected * log(observed / expected) * sign(observed - 
expected)
+   */
+  public static double logLikelihood(double observed, double expected) {
+    if (observed == 0) {
+      return -expected;
+    }
+    if (expected == 0) {
+      return observed;
+    }
+    double score = -2 * observed * Math.log(observed / expected);
+    if (observed < expected) {
+      score *= -1;
+    }
+    return score;
+  }
+
+  private Significance() {
+    // prevent instantiation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java 
b/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
new file mode 100644
index 0000000..3341672
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import org.apache.aurora.common.base.MorePreconditions;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tracks event statistics over a sliding window of time. An event is 
something that has a
+ * frequency and associated total.
+ *
+ * @author William Farner
+ */
+public class SlidingStats {
+
+  private static final int DEFAULT_WINDOW_SIZE = 1;
+
+  private final AtomicLong total;
+  private final AtomicLong events;
+  private final Stat<Double> perEventLatency;
+
+  /**
+   * Creates a new sliding statistic with the given name
+   *
+   * @param name Name for this stat collection.
+   * @param totalUnitDisplay String to display for the total counter unit.
+   */
+  public SlidingStats(String name, String totalUnitDisplay) {
+    this(name, totalUnitDisplay, DEFAULT_WINDOW_SIZE);
+  }
+
+  /**
+   * Creates a new sliding statistic with the given name
+   *
+   * @param name Name for this stat collection.
+   * @param totalUnitDisplay String to display for the total counter unit.
+   * @param windowSize The window size for the per second Rate and Ratio stats.
+   */
+  public SlidingStats(String name, String totalUnitDisplay, int windowSize) {
+    MorePreconditions.checkNotBlank(name);
+
+    String totalDisplay = name + "_" + totalUnitDisplay + "_total";
+    String eventDisplay = name + "_events";
+    total = Stats.exportLong(totalDisplay);
+    events = Stats.exportLong(eventDisplay);
+    perEventLatency = Stats.export(Ratio.of(name + "_" + totalUnitDisplay + 
"_per_event",
+        Rate.of(totalDisplay + "_per_sec", 
total).withWindowSize(windowSize).build(),
+        Rate.of(eventDisplay + "_per_sec", 
events).withWindowSize(windowSize).build()));
+  }
+
+  public AtomicLong getTotalCounter() {
+    return total;
+  }
+
+  public AtomicLong getEventCounter() {
+    return events;
+  }
+
+  public Stat<Double> getPerEventLatency() {
+    return perEventLatency;
+  }
+
+  /**
+   * Accumulates counter by an offset.  This is is useful for tracking things 
like
+   * latency of operations.
+   *
+   * TODO(William Farner): Implement a wrapper to SlidingStats that expects to 
accumulate time, and can
+   *    convert between time units.
+   *
+   * @param value The value to accumulate.
+   */
+  public void accumulate(long value) {
+    total.addAndGet(value);
+    events.incrementAndGet();
+  }
+
+  @Override
+  public String toString() {
+    return total + " " + events;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Stat.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Stat.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Stat.java
new file mode 100644
index 0000000..6537859
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Stat.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * A stat that may only be read, no method calls will cause any internal state 
changes.
+ *
+ * @author William Farner
+ */
+public interface Stat<T> {
+
+  /**
+   * Gets the name of this stat. For sake of convention, variable names should 
be alphanumeric, and
+   * use underscores.
+   *
+   * @return The variable name.
+   */
+  String getName();
+
+  /**
+   * Retrieves the most recent value of the stat.
+   *
+   * @return The most recent value.
+   */
+  T read();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/StatImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/StatImpl.java 
b/commons/src/main/java/org/apache/aurora/common/stats/StatImpl.java
new file mode 100644
index 0000000..adfcd6f
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/StatImpl.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import org.apache.aurora.common.base.MorePreconditions;
+
+/**
+ * A convenience class to not require stat implementers to implement {@link 
#getName()}.
+ *
+ * @author William Farner
+ */
+public abstract class StatImpl<T> implements Stat<T> {
+
+  private final String name;
+
+  public StatImpl(String name) {
+    this.name = MorePreconditions.checkNotBlank(name);
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/StatRegistry.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/StatRegistry.java 
b/commons/src/main/java/org/apache/aurora/common/stats/StatRegistry.java
new file mode 100644
index 0000000..54e8419
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/StatRegistry.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * A registry of stats.
+ *
+ * @author William Farner
+ */
+public interface StatRegistry {
+
+  /**
+   * Gets all stats in the registry.
+   *
+   * @return All registered stats.
+   */
+  Iterable<RecordingStat<? extends Number>> getStats();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Statistics.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/Statistics.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Statistics.java
new file mode 100644
index 0000000..498abb0
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Statistics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * A simple class to keep running statistics that require O(1) storage.
+ *
+ * @author William Farner
+ */
+public class Statistics implements StatisticsInterface {
+  private long populationSize;
+  private long sum;
+  private double accumulatedVariance;
+  private double runningMean;
+
+  private long minValue;
+  private long maxValue;
+
+  public Statistics() {
+    clear();
+  }
+
+  public void accumulate(long value) {
+    populationSize++;
+    sum += value;
+    double delta = value - runningMean;
+    runningMean += delta / populationSize;
+    accumulatedVariance += delta * (value - runningMean);
+
+    // Update max/min.
+    minValue = value < minValue ? value : minValue;
+    maxValue = value > maxValue ? value : maxValue;
+  }
+
+  public void clear() {
+    populationSize = 0;
+    sum = 0;
+    accumulatedVariance = 0;
+    runningMean = 0;
+    minValue = Long.MAX_VALUE;
+    maxValue = Long.MIN_VALUE;
+  }
+
+  public double variance() {
+    return accumulatedVariance / populationSize;
+  }
+
+  public double standardDeviation() {
+    return Math.sqrt(variance());
+  }
+
+  public double mean() {
+    return runningMean;
+  }
+
+  public long min() {
+    return minValue;
+  }
+
+  public long max() {
+    return maxValue;
+  }
+
+  public long range() {
+    return maxValue - minValue;
+  }
+
+  public long sum() {
+    return sum;
+  }
+
+  public long populationSize() {
+    return populationSize;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Mean: %f, Min: %d, Max: %d, Range: %d, Stddev: %f, 
Variance: %f, " +
+        "Population: %d, Sum: %d", mean(), min(), max(), range(), 
standardDeviation(),
+        variance(), populationSize(), sum());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/StatisticsInterface.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/StatisticsInterface.java 
b/commons/src/main/java/org/apache/aurora/common/stats/StatisticsInterface.java
new file mode 100644
index 0000000..893c069
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/StatisticsInterface.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+/**
+ * Interface representing statistics of a set of (long) elements.
+ */
+public interface StatisticsInterface {
+  /**
+   * Add a value in the Statistics object.
+   * @param value value that you want to accumulate.
+   */
+  void accumulate(long value);
+
+  /**
+   * Clear the Statistics instance (equivalent to recreate a new one).
+   */
+  void clear();
+
+  /**
+   * Return the variance of the inserted elements.
+   */
+  double variance();
+
+  /**
+   * Return the standard deviation of the inserted elements.
+   */
+  double standardDeviation();
+
+  /**
+   * Return the mean of the inserted elements.
+   */
+  double mean();
+
+  /**
+   * Return the min of the inserted elements.
+   */
+  long min();
+
+  /**
+   * Return the max of the inserted elements.
+   */
+  long max();
+
+  /**
+   * Return the range of the inserted elements.
+   */
+  long range();
+
+  /**
+   * Return the sum of the inserted elements.
+   */
+  long sum();
+
+  /**
+   * Return the number of the inserted elements.
+   */
+  long populationSize();
+}

Reply via email to