Updated Branches: refs/heads/cassandra-0.8 c0a342bc8 -> 28a9a15e1 refs/heads/cassandra-1.1 62b0699c8 -> c6f12ef74 refs/heads/trunk 3f40c6e63 -> b7a8b57ca
Make BoundedStatsDeque threadsafe, removed AbstractStateDeque and AdaptiveLatencyTracker. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-4019 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7a8b57c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7a8b57c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7a8b57c Branch: refs/heads/trunk Commit: b7a8b57ca653843e14bad544745a1ee7f0df5f44 Parents: 3f40c6e Author: Brandon Williams <[email protected]> Authored: Thu Mar 8 17:41:22 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Thu Mar 8 17:41:22 2012 -0600 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/FailureDetector.java | 24 +---- .../cassandra/locator/DynamicEndpointSnitch.java | 92 ++++----------- .../apache/cassandra/utils/AbstractStatsDeque.java | 71 ----------- .../apache/cassandra/utils/BoundedStatsDeque.java | 43 +++++-- .../cassandra/utils/BoundedStatsDequeTest.java | 6 - 5 files changed, 56 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index e95da1b..749592f 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -272,38 +272,18 @@ class ArrivalWindow tLast = value; } - synchronized double sum() - { - return arrivalIntervals.sum(); - } - - synchronized double sumOfDeviations() - { - return arrivalIntervals.sumOfDeviations(); - } - - synchronized double mean() + double mean() { return arrivalIntervals.mean(); } - synchronized double variance() - { - return arrivalIntervals.variance(); - } - - double stdev() - { - return arrivalIntervals.stdev(); - } - void clear() { arrivalIntervals.clear(); } // see CASSANDRA-2597 for an explanation of the math at work here. - synchronized double phi(long tnow) + double phi(long tnow) { int size = arrivalIntervals.size(); double t = tnow - tLast; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index e4f154a..0f3aaca 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.locator; import java.lang.management.ManagementFactory; @@ -22,7 +23,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -31,7 +31,7 @@ import javax.management.ObjectName; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.AbstractStatsDeque; +import org.apache.cassandra.utils.BoundedStatsDeque; import org.apache.cassandra.utils.FBUtilities; /** @@ -42,14 +42,14 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private static final int UPDATES_PER_INTERVAL = 10000; private static final int WINDOW_SIZE = 100; - private final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); - private final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); - private final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); - private final String mbeanName; + private int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); + private int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); + private double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); + private String mbeanName; private boolean registered = false; private final ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap<InetAddress, Double>(); - private final ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker>(); + private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = new ConcurrentHashMap<InetAddress, BoundedStatsDeque>(); private final AtomicInteger intervalupdates = new AtomicInteger(0); public final IEndpointSnitch subsnitch; @@ -172,7 +172,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { Double scored1 = scores.get(a1); Double scored2 = scores.get(a2); - + if (scored1 == null) { scored1 = 0.0; @@ -197,21 +197,21 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL) return; - AdaptiveLatencyTracker tracker = windows.get(host); - if (tracker == null) + BoundedStatsDeque deque = windows.get(host); + if (deque == null) { - AdaptiveLatencyTracker alt = new AdaptiveLatencyTracker(WINDOW_SIZE); - tracker = windows.putIfAbsent(host, alt); - if (tracker == null) - tracker = alt; + BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE); + deque = windows.putIfAbsent(host, maybeNewDeque); + if (deque == null) + deque = maybeNewDeque; } - tracker.add(latency); + deque.add(latency); intervalupdates.getAndIncrement(); } private void updateScores() // this is expensive { - if (!StorageService.instance.isInitialized()) + if (!StorageService.instance.isInitialized()) return; if (!registered) { @@ -222,18 +222,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } } - for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet()) + for (Map.Entry<InetAddress, BoundedStatsDeque> entry: windows.entrySet()) { - scores.put(entry.getKey(), entry.getValue().score()); + scores.put(entry.getKey(), entry.getValue().mean()); } intervalupdates.set(0); } private void reset() { - for (AdaptiveLatencyTracker tracker : windows.values()) + for (BoundedStatsDeque deque : windows.values()) { - tracker.clear(); + deque.clear(); } } @@ -263,7 +263,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { InetAddress host = InetAddress.getByName(hostname); ArrayList<Double> timings = new ArrayList<Double>(); - AdaptiveLatencyTracker window = windows.get(host); + BoundedStatsDeque window = windows.get(host); if (window != null) { for (double time: window) @@ -275,51 +275,3 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } } - -/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/ -class AdaptiveLatencyTracker extends AbstractStatsDeque -{ - private final LinkedBlockingDeque<Double> latencies; - - AdaptiveLatencyTracker(int size) - { - latencies = new LinkedBlockingDeque<Double>(size); - } - - public void add(double i) - { - if (!latencies.offer(i)) - { - try - { - latencies.remove(); - } - catch (NoSuchElementException e) - { - // oops, clear() beat us to it - } - latencies.offer(i); - } - } - - public void clear() - { - latencies.clear(); - } - - public Iterator<Double> iterator() - { - return latencies.iterator(); - } - - public int size() - { - return latencies.size(); - } - - double score() - { - return (size() > 0) ? mean() : 0.0; - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java b/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java deleted file mode 100644 index 85f158b..0000000 --- a/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils; - -import java.util.Iterator; - -public abstract class AbstractStatsDeque implements Iterable<Double> -{ - public abstract Iterator<Double> iterator(); - public abstract int size(); - public abstract void add(double o); - public abstract void clear(); - - // - // statistical methods - // - - public double sum() - { - double sum = 0d; - for (Double interval : this) - { - sum += interval; - } - return sum; - } - - public double sumOfDeviations() - { - double sumOfDeviations = 0d; - double mean = mean(); - - for (Double interval : this) - { - double v = interval - mean; - sumOfDeviations += v * v; - } - - return sumOfDeviations; - } - - public double mean() - { - return sum() / size(); - } - - public double variance() - { - return sumOfDeviations() / size(); - } - - public double stdev() - { - return Math.sqrt(variance()); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java b/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java index e3adee9..1d9bc7b 100644 --- a/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java +++ b/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java @@ -17,21 +17,20 @@ */ package org.apache.cassandra.utils; -import java.util.ArrayDeque; import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingDeque; /** - * not threadsafe. caller is responsible for any locking necessary. + * threadsafe bounded deque with statistical functions */ -public class BoundedStatsDeque extends AbstractStatsDeque +public class BoundedStatsDeque implements Iterable<Double> { - private final int size; - protected final ArrayDeque<Double> deque; + protected final LinkedBlockingDeque<Double> deque; public BoundedStatsDeque(int size) { - this.size = size; - deque = new ArrayDeque<Double>(size); + deque = new LinkedBlockingDeque<Double>(size); } public Iterator<Double> iterator() @@ -49,12 +48,34 @@ public class BoundedStatsDeque extends AbstractStatsDeque deque.clear(); } - public void add(double o) + public void add(double i) { - if (size == deque.size()) + if (!deque.offer(i)) { - deque.remove(); + try + { + deque.remove(); + } + catch (NoSuchElementException e) + { + // oops, clear() beat us to it + } + deque.offer(i); } - deque.add(o); + } + + public double sum() + { + double sum = 0d; + for (Double interval : deque) + { + sum += interval; + } + return sum; + } + + public double mean() + { + return size() > 0 ? sum() / size() : 0; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java b/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java index fabb462..af540cc 100644 --- a/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java +++ b/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java @@ -40,9 +40,6 @@ public class BoundedStatsDequeTest assertEquals(0, bsd.size()); assertEquals(0, bsd.sum(), 0.001d); assertEquals(Double.NaN, bsd.mean(), 0.001d); - assertEquals(Double.NaN, bsd.variance(), 0.001d); - assertEquals(Double.NaN, bsd.stdev(), 0.001d); - assertEquals(0, bsd.sumOfDeviations(), 0.001d); bsd.add(1d); //this one falls out, over limit bsd.add(2d); @@ -66,9 +63,6 @@ public class BoundedStatsDequeTest assertEquals(size, bsd.size()); assertEquals(14, bsd.sum(), 0.001d); assertEquals(3.5, bsd.mean(), 0.001d); - assertEquals(1.25, bsd.variance(), 0.001d); - assertEquals(1.1180d, bsd.stdev(), 0.001d); - assertEquals(5, bsd.sumOfDeviations(), 0.001d); //check that it clears properly bsd.clear();
