Updated Branches: refs/heads/trunk 253f86e31 -> 3220af1fe
TRIVIAL: Fix misc. numerical issues in histogram. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3220af1f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3220af1f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3220af1f Branch: refs/heads/trunk Commit: 3220af1fe2fad8da1f5bcc101ab9d3e8919b03bd Parents: 253f86e Author: Jay Kreps <[email protected]> Authored: Thu Feb 6 10:49:37 2014 -0800 Committer: Jay Kreps <[email protected]> Committed: Thu Feb 6 10:49:37 2014 -0800 ---------------------------------------------------------------------- .../kafka/common/metrics/stats/Histogram.java | 8 ++- .../kafka/common/metrics/stats/Percentiles.java | 74 ++++++++++++++------ .../kafka/common/metrics/stats/SampledStat.java | 12 ++-- .../java/kafka/common/metrics/MetricsTest.java | 34 +++++++++ .../common/metrics/stats/HistogramTest.java | 44 ++++++++++-- 5 files changed, 136 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/main/java/kafka/common/metrics/stats/Histogram.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java index c59b585..9922571 100644 --- a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java +++ b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java @@ -18,7 +18,7 @@ public class Histogram { } public double value(double quantile) { - if (count == 0L) + if (count == 0.0d) return Double.NaN; float sum = 0.0f; float quant = (float) quantile; @@ -30,6 +30,10 @@ public class Histogram { return Float.POSITIVE_INFINITY; } + public float[] counts() { + return this.hist; + } + public void clear() { for (int i = 0; i < this.hist.length; i++) this.hist[i] = 0.0f; @@ -117,7 +121,7 @@ public class Histogram { if (b == this.bins - 1) { return Float.POSITIVE_INFINITY; } else { - double unscaled = (b * (b - 1.0)) / 2.0; + double unscaled = (b * (b + 1.0)) / 2.0; return unscaled * this.scale; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java index 686c726..c3f8942 100644 --- a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java +++ b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java @@ -13,35 +13,33 @@ import kafka.common.metrics.stats.Histogram.LinearBinScheme; /** * A compound stat that reports one or more percentiles */ -public class Percentiles implements CompoundStat { +public class Percentiles extends SampledStat implements CompoundStat { public static enum BucketSizing { CONSTANT, LINEAR } + private final int buckets; private final Percentile[] percentiles; - private Histogram current; - private Histogram shadow; - private long lastWindow; - private long eventCount; + private final BinScheme binScheme; public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) { this(sizeInBytes, 0.0, max, bucketing, percentiles); } public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) { + super(0.0); this.percentiles = percentiles; - BinScheme scheme = null; + this.buckets = sizeInBytes / 4; if (bucketing == BucketSizing.CONSTANT) { - scheme = new ConstantBinScheme(sizeInBytes / 4, min, max); + this.binScheme = new ConstantBinScheme(buckets, min, max); } else if (bucketing == BucketSizing.LINEAR) { if (min != 0.0d) throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0."); - scheme = new LinearBinScheme(sizeInBytes / 4, max); + this.binScheme = new LinearBinScheme(buckets, max); + } else { + throw new IllegalArgumentException("Unknown bucket type: " + bucketing); } - this.current = new Histogram(scheme); - this.shadow = new Histogram(scheme); - this.eventCount = 0L; } @Override @@ -51,26 +49,56 @@ public class Percentiles implements CompoundStat { final double pct = percentile.percentile(); ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() { public double measure(MetricConfig config, long now) { - return current.value(pct / 100.0); + return value(config, now, pct / 100.0); } })); } return ms; } + public double value(MetricConfig config, long now, double quantile) { + timeoutObsoleteSamples(config, now); + float count = 0.0f; + for (Sample sample : this.samples) + count += sample.eventCount; + if (count == 0.0f) + return Double.NaN; + float sum = 0.0f; + float quant = (float) quantile; + for (int b = 0; b < buckets; b++) { + for (int s = 0; s < this.samples.size(); s++) { + HistogramSample sample = (HistogramSample) this.samples.get(s); + float[] hist = sample.histogram.counts(); + sum += hist[b]; + if (sum / count > quant) + return binScheme.fromBin(b); + } + } + return Double.POSITIVE_INFINITY; + } + + public double combine(List<Sample> samples, MetricConfig config, long now) { + return value(config, now, 0.5); + } + @Override - public void record(MetricConfig config, double value, long time) { - long ellapsed = time - this.lastWindow; - if (ellapsed > config.timeWindowNs() / 2 || this.eventCount > config.eventWindow() / 2) - this.shadow.clear(); - if (ellapsed > config.timeWindowNs() || this.eventCount > config.eventWindow()) { - Histogram tmp = this.current; - this.current = this.shadow; - this.shadow = tmp; - this.shadow.clear(); + protected HistogramSample newSample(long now) { + return new HistogramSample(this.binScheme, now); + } + + @Override + protected void update(Sample sample, MetricConfig config, double value, long now) { + HistogramSample hist = (HistogramSample) sample; + hist.histogram.record(value); + } + + private static class HistogramSample extends SampledStat.Sample { + private final Histogram histogram; + + private HistogramSample(BinScheme scheme, long now) { + super(0.0, now); + this.histogram = new Histogram(scheme); } - this.current.record(value); - this.shadow.record(value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java index 6f820fa..e696af5 100644 --- a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java @@ -20,7 +20,7 @@ public abstract class SampledStat implements MeasurableStat { private double initialValue; private int current = 0; - private List<Sample> samples; + protected List<Sample> samples; public SampledStat(double initialValue) { this.initialValue = initialValue; @@ -39,7 +39,7 @@ public abstract class SampledStat implements MeasurableStat { private Sample advance(MetricConfig config, long now) { this.current = (this.current + 1) % config.samples(); if (this.current >= samples.size()) { - Sample sample = new Sample(this.initialValue, now); + Sample sample = newSample(now); this.samples.add(sample); return sample; } else { @@ -49,6 +49,10 @@ public abstract class SampledStat implements MeasurableStat { } } + protected Sample newSample(long now) { + return new Sample(this.initialValue, now); + } + @Override public double measure(MetricConfig config, long now) { timeoutObsoleteSamples(config, now); @@ -57,7 +61,7 @@ public abstract class SampledStat implements MeasurableStat { public Sample current(long now) { if (samples.size() == 0) - this.samples.add(new Sample(initialValue, now)); + this.samples.add(newSample(now)); return this.samples.get(this.current); } @@ -70,7 +74,7 @@ public abstract class SampledStat implements MeasurableStat { public abstract double combine(List<Sample> samples, MetricConfig config, long now); /* Timeout any windows that have expired in the absense of any events */ - private void timeoutObsoleteSamples(MetricConfig config, long now) { + protected void timeoutObsoleteSamples(MetricConfig config, long now) { for (int i = 0; i < samples.size(); i++) { int idx = (this.current + i) % samples.size(); Sample sample = this.samples.get(idx); http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/test/java/kafka/common/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/kafka/common/metrics/MetricsTest.java index 7d06864..f66cc7f 100644 --- a/clients/src/test/java/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/kafka/common/metrics/MetricsTest.java @@ -6,6 +6,7 @@ import static org.junit.Assert.fail; import java.util.Arrays; import java.util.concurrent.TimeUnit; +import kafka.common.Metric; import kafka.common.metrics.stats.Avg; import kafka.common.metrics.stats.Count; import kafka.common.metrics.stats.Max; @@ -163,6 +164,39 @@ public class MetricsTest { } } + @Test + public void testPercentiles() { + int buckets = 100; + Percentiles percs = new Percentiles(4 * buckets, + 0.0, + 100.0, + BucketSizing.CONSTANT, + new Percentile("test.p25", 25), + new Percentile("test.p50", 50), + new Percentile("test.p75", 75)); + MetricConfig config = new MetricConfig().eventWindow(50).samples(2); + Sensor sensor = metrics.sensor("test", config); + sensor.add(percs); + Metric p25 = this.metrics.metrics().get("test.p25"); + Metric p50 = this.metrics.metrics().get("test.p50"); + Metric p75 = this.metrics.metrics().get("test.p75"); + + // record two windows worth of sequential values + for (int i = 0; i < buckets; i++) + sensor.record(i); + + assertEquals(25, p25.value(), 1.0); + assertEquals(50, p50.value(), 1.0); + assertEquals(75, p75.value(), 1.0); + + for (int i = 0; i < buckets; i++) + sensor.record(0.0); + + assertEquals(0.0, p25.value(), 1.0); + assertEquals(0.0, p50.value(), 1.0); + assertEquals(0.0, p75.value(), 1.0); + } + public static class ConstantMeasurable implements Measurable { public double value = 0.0; http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java index 03bdd2b..9c6a4ab 100644 --- a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java +++ b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java @@ -1,6 +1,10 @@ package kafka.common.metrics.stats; import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Random; + import kafka.common.metrics.stats.Histogram.BinScheme; import kafka.common.metrics.stats.Histogram.ConstantBinScheme; import kafka.common.metrics.stats.Histogram.LinearBinScheme; @@ -11,14 +15,14 @@ public class HistogramTest { private static final double EPS = 0.0000001d; - // @Test + @Test public void testHistogram() { BinScheme scheme = new ConstantBinScheme(12, -5, 5); Histogram hist = new Histogram(scheme); for (int i = -5; i < 5; i++) hist.record(i); for (int i = 0; i < 10; i++) - assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0), EPS); + assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0 + EPS), EPS); } @Test @@ -33,18 +37,17 @@ public class HistogramTest { checkBinningConsistency(scheme); } + @Test public void testLinearBinScheme() { - LinearBinScheme scheme = new LinearBinScheme(5, 5); - for (int i = 0; i < scheme.bins(); i++) - System.out.println(i + " " + scheme.fromBin(i)); + LinearBinScheme scheme = new LinearBinScheme(10, 10); checkBinningConsistency(scheme); } private void checkBinningConsistency(BinScheme scheme) { for (int bin = 0; bin < scheme.bins(); bin++) { double fromBin = scheme.fromBin(bin); - int binAgain = scheme.toBin(fromBin); - assertEquals("unbinning and rebinning " + bin + int binAgain = scheme.toBin(fromBin + EPS); + assertEquals("unbinning and rebinning the bin " + bin + " gave a different result (" + fromBin + " was placed in bin " @@ -53,4 +56,31 @@ public class HistogramTest { } } + public static void main(String[] args) { + Random random = new Random(); + System.out.println("[-100, 100]:"); + for (BinScheme scheme : Arrays.asList(new ConstantBinScheme(1000, -100, 100), + new ConstantBinScheme(100, -100, 100), + new ConstantBinScheme(10, -100, 100))) { + Histogram h = new Histogram(scheme); + for (int i = 0; i < 10000; i++) + h.record(200.0 * random.nextDouble() - 100.0); + for (double quantile = 0.0; quantile < 1.0; quantile += 0.05) + System.out.printf("%5.2f: %.1f, ", quantile, h.value(quantile)); + System.out.println(); + } + + System.out.println("[0, 1000]"); + for (BinScheme scheme : Arrays.asList(new LinearBinScheme(1000, 1000), + new LinearBinScheme(100, 1000), + new LinearBinScheme(10, 1000))) { + Histogram h = new Histogram(scheme); + for (int i = 0; i < 10000; i++) + h.record(1000.0 * random.nextDouble()); + for (double quantile = 0.0; quantile < 1.0; quantile += 0.05) + System.out.printf("%5.2f: %.1f, ", quantile, h.value(quantile)); + System.out.println(); + } + } + }
