Updated Branches:
  refs/heads/cassandra-0.8 c0a342bc8 -> 28a9a15e1
  refs/heads/cassandra-1.1 62b0699c8 -> c6f12ef74
  refs/heads/trunk 3f40c6e63 -> b7a8b57ca


Make BoundedStatsDeque threadsafe, removed AbstractStateDeque and
AdaptiveLatencyTracker.
Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-4019


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7a8b57c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7a8b57c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7a8b57c

Branch: refs/heads/trunk
Commit: b7a8b57ca653843e14bad544745a1ee7f0df5f44
Parents: 3f40c6e
Author: Brandon Williams <[email protected]>
Authored: Thu Mar 8 17:41:22 2012 -0600
Committer: Brandon Williams <[email protected]>
Committed: Thu Mar 8 17:41:22 2012 -0600

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/FailureDetector.java  |   24 +----
 .../cassandra/locator/DynamicEndpointSnitch.java   |   92 ++++-----------
 .../apache/cassandra/utils/AbstractStatsDeque.java |   71 -----------
 .../apache/cassandra/utils/BoundedStatsDeque.java  |   43 +++++--
 .../cassandra/utils/BoundedStatsDequeTest.java     |    6 -
 5 files changed, 56 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index e95da1b..749592f 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -272,38 +272,18 @@ class ArrivalWindow
         tLast = value;
     }
 
-    synchronized double sum()
-    {
-        return arrivalIntervals.sum();
-    }
-
-    synchronized double sumOfDeviations()
-    {
-        return arrivalIntervals.sumOfDeviations();
-    }
-
-    synchronized double mean()
+    double mean()
     {
         return arrivalIntervals.mean();
     }
 
-    synchronized double variance()
-    {
-        return arrivalIntervals.variance();
-    }
-
-    double stdev()
-    {
-        return arrivalIntervals.stdev();
-    }
-
     void clear()
     {
         arrivalIntervals.clear();
     }
 
     // see CASSANDRA-2597 for an explanation of the math at work here.
-    synchronized double phi(long tnow)
+    double phi(long tnow)
     {
         int size = arrivalIntervals.size();
         double t = tnow - tLast;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index e4f154a..0f3aaca 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.locator;
 
 import java.lang.management.ManagementFactory;
@@ -22,7 +23,6 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
@@ -31,7 +31,7 @@ import javax.management.ObjectName;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.AbstractStatsDeque;
+import org.apache.cassandra.utils.BoundedStatsDeque;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -42,14 +42,14 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     private static final int UPDATES_PER_INTERVAL = 10000;
     private static final int WINDOW_SIZE = 100;
 
-    private final int UPDATE_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicUpdateInterval();
-    private final int RESET_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicResetInterval();
-    private final double BADNESS_THRESHOLD = 
DatabaseDescriptor.getDynamicBadnessThreshold();
-    private final String mbeanName;
+    private int UPDATE_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicUpdateInterval();
+    private int RESET_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicResetInterval();
+    private double BADNESS_THRESHOLD = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+    private String mbeanName;
     private boolean registered = false;
 
     private final ConcurrentHashMap<InetAddress, Double> scores = new 
ConcurrentHashMap<InetAddress, Double>();
-    private final ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> 
windows = new ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker>();
+    private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = 
new ConcurrentHashMap<InetAddress, BoundedStatsDeque>();
     private final AtomicInteger intervalupdates = new AtomicInteger(0);
 
     public final IEndpointSnitch subsnitch;
@@ -172,7 +172,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     {
         Double scored1 = scores.get(a1);
         Double scored2 = scores.get(a2);
-
+        
         if (scored1 == null)
         {
             scored1 = 0.0;
@@ -197,21 +197,21 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     {
         if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
             return;
-        AdaptiveLatencyTracker tracker = windows.get(host);
-        if (tracker == null)
+        BoundedStatsDeque deque = windows.get(host);
+        if (deque == null)
         {
-            AdaptiveLatencyTracker alt = new 
AdaptiveLatencyTracker(WINDOW_SIZE);
-            tracker = windows.putIfAbsent(host, alt);
-            if (tracker == null)
-                tracker = alt;
+            BoundedStatsDeque maybeNewDeque  = new 
BoundedStatsDeque(WINDOW_SIZE);
+            deque = windows.putIfAbsent(host, maybeNewDeque);
+            if (deque == null)
+                deque = maybeNewDeque;
         }
-        tracker.add(latency);
+        deque.add(latency);
         intervalupdates.getAndIncrement();
     }
 
     private void updateScores() // this is expensive
     {
-        if (!StorageService.instance.isInitialized())
+        if (!StorageService.instance.isInitialized()) 
             return;
         if (!registered)
         {
@@ -222,18 +222,18 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
             }
 
         }
-        for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: 
windows.entrySet())
+        for (Map.Entry<InetAddress, BoundedStatsDeque> entry: 
windows.entrySet())
         {
-            scores.put(entry.getKey(), entry.getValue().score());
+            scores.put(entry.getKey(), entry.getValue().mean());
         }
         intervalupdates.set(0);
     }
 
     private void reset()
     {
-        for (AdaptiveLatencyTracker tracker : windows.values())
+        for (BoundedStatsDeque deque : windows.values())
         {
-            tracker.clear();
+            deque.clear();
         }
     }
 
@@ -263,7 +263,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     {
         InetAddress host = InetAddress.getByName(hostname);
         ArrayList<Double> timings = new ArrayList<Double>();
-        AdaptiveLatencyTracker window = windows.get(host);
+        BoundedStatsDeque window = windows.get(host);
         if (window != null)
         {
             for (double time: window)
@@ -275,51 +275,3 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     }
 
 }
-
-/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification 
for arbitrary times **/
-class AdaptiveLatencyTracker extends AbstractStatsDeque
-{
-    private final LinkedBlockingDeque<Double> latencies;
-
-    AdaptiveLatencyTracker(int size)
-    {
-        latencies = new LinkedBlockingDeque<Double>(size);
-    }
-
-    public void add(double i)
-    {
-        if (!latencies.offer(i))
-        {
-            try
-            {
-                latencies.remove();
-            }
-            catch (NoSuchElementException e)
-            {
-                // oops, clear() beat us to it
-            }
-            latencies.offer(i);
-        }
-    }
-
-    public void clear()
-    {
-        latencies.clear();
-    }
-
-    public Iterator<Double> iterator()
-    {
-        return latencies.iterator();
-    }
-
-    public int size()
-    {
-        return latencies.size();
-    }
-
-    double score()
-    {
-        return (size() > 0) ? mean() : 0.0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java 
b/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
deleted file mode 100644
index 85f158b..0000000
--- a/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.Iterator;
-
-public abstract class AbstractStatsDeque implements Iterable<Double>
-{
-    public abstract Iterator<Double> iterator();
-    public abstract int size();
-    public abstract void add(double o);
-    public abstract void clear();
-
-    //
-    // statistical methods
-    //
-
-    public double sum()
-    {
-        double sum = 0d;
-        for (Double interval : this)
-        {
-            sum += interval;
-        }
-        return sum;
-    }
-
-    public double sumOfDeviations()
-    {
-        double sumOfDeviations = 0d;
-        double mean = mean();
-
-        for (Double interval : this)
-        {
-            double v = interval - mean;
-            sumOfDeviations += v * v;
-        }
-
-        return sumOfDeviations;
-    }
-
-    public double mean()
-    {
-        return sum() / size();
-    }
-
-    public double variance()
-    {
-        return sumOfDeviations() / size();
-    }
-
-    public double stdev()
-    {
-        return Math.sqrt(variance());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java 
b/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
index e3adee9..1d9bc7b 100644
--- a/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
+++ b/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
@@ -17,21 +17,20 @@
  */
 package org.apache.cassandra.utils;
 
-import java.util.ArrayDeque;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingDeque;
 
 /**
- * not threadsafe.  caller is responsible for any locking necessary.
+ * threadsafe bounded deque with statistical functions
  */
-public class BoundedStatsDeque extends AbstractStatsDeque
+public class BoundedStatsDeque implements Iterable<Double>
 {
-    private final int size;
-    protected final ArrayDeque<Double> deque;
+    protected final LinkedBlockingDeque<Double> deque;
 
     public BoundedStatsDeque(int size)
     {
-        this.size = size;
-        deque = new ArrayDeque<Double>(size);
+        deque = new LinkedBlockingDeque<Double>(size);
     }
 
     public Iterator<Double> iterator()
@@ -49,12 +48,34 @@ public class BoundedStatsDeque extends AbstractStatsDeque
         deque.clear();
     }
 
-    public void add(double o)
+    public void add(double i)
     {
-        if (size == deque.size())
+        if (!deque.offer(i))
         {
-            deque.remove();
+            try
+            {
+                deque.remove();
+            }
+            catch (NoSuchElementException e)
+            {
+                // oops, clear() beat us to it
+            }
+            deque.offer(i);
         }
-        deque.add(o);
+    }
+
+    public double sum()
+    {
+        double sum = 0d;
+        for (Double interval : deque)
+        {
+            sum += interval;
+        }
+        return sum;
+    }
+
+    public double mean()
+    {
+        return size() > 0 ? sum() / size() : 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7a8b57c/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java 
b/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java
index fabb462..af540cc 100644
--- a/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java
+++ b/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java
@@ -40,9 +40,6 @@ public class BoundedStatsDequeTest
         assertEquals(0, bsd.size());
         assertEquals(0, bsd.sum(), 0.001d);
         assertEquals(Double.NaN, bsd.mean(), 0.001d);
-        assertEquals(Double.NaN, bsd.variance(), 0.001d);
-        assertEquals(Double.NaN, bsd.stdev(), 0.001d);
-        assertEquals(0, bsd.sumOfDeviations(), 0.001d);
 
         bsd.add(1d); //this one falls out, over limit
         bsd.add(2d);
@@ -66,9 +63,6 @@ public class BoundedStatsDequeTest
         assertEquals(size, bsd.size());
         assertEquals(14, bsd.sum(), 0.001d);
         assertEquals(3.5, bsd.mean(), 0.001d);
-        assertEquals(1.25, bsd.variance(), 0.001d);
-        assertEquals(1.1180d, bsd.stdev(), 0.001d);
-        assertEquals(5, bsd.sumOfDeviations(), 0.001d);
 
         //check that it clears properly
         bsd.clear();

Reply via email to