Author: jbellis
Date: Fri Dec 31 18:28:00 2010
New Revision: 1054156

URL: http://svn.apache.org/viewvc?rev=1054156&view=rev
Log:
Make snitches configurable at runtime
patch by Jon Hermes; reviewed by jbellis for CASSANDRA-1374

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec 31 18:28:00 2010
@@ -5,6 +5,7 @@ dev
  * avoid polluting page cache with commitlog or sstable writes
    and seq scan operations (CASSANDRA-1470)
  * add RMI authentication options to nodetool (CASSANDRA-1921)
+ * Make snitches configurable at runtime (CASSANDRA-1374)
 
 
 0.7.0-rc4

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Fri Dec 31 18:28:00 2010
@@ -400,7 +400,7 @@ public class    DatabaseDescriptor
         IEndpointSnitch snitch = 
FBUtilities.construct(endpointSnitchClassName, "snitch");
         return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : 
snitch;
     }
-    
+
     /** load keyspace (table) definitions, but do not initialize the table 
instances. */
     public static void loadSchemas() throws IOException                        
 
     {
@@ -705,6 +705,10 @@ public class    DatabaseDescriptor
     {
         return snitch;
     }
+    public static void setEndpointSnitch(IEndpointSnitch eps)
+    {
+        snitch = eps;
+    }
 
     public static IRequestScheduler getRequestScheduler()
     {
@@ -1104,14 +1108,26 @@ public class    DatabaseDescriptor
     {
         return conf.dynamic_snitch_update_interval_in_ms;
     }
+    public static void setDynamicUpdateInterval(Integer dynamicUpdateInterval)
+    {
+        conf.dynamic_snitch_update_interval_in_ms = dynamicUpdateInterval;
+    }
 
     public static int getDynamicResetInterval()
     {
         return conf.dynamic_snitch_reset_interval_in_ms;
     }
+    public static void setDynamicResetInterval(Integer dynamicResetInterval)
+    {
+        conf.dynamic_snitch_reset_interval_in_ms = dynamicResetInterval;
+    }
 
     public static double getDynamicBadnessThreshold()
     {
         return conf.dynamic_snitch_badness_threshold;
     }
+    public static void setDynamicBadnessThreshold(Double 
dynamicBadnessThreshold)
+    {
+        conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold;
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Fri Dec 31 18:28:00 2010
@@ -46,9 +46,10 @@ public abstract class AbstractReplicatio
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
     public final String table;
-    private final TokenMetadata tokenMetadata;
-    public final IEndpointSnitch snitch;
     public final Map<String, String> configOptions;
+    private final TokenMetadata tokenMetadata;
+
+    public IEndpointSnitch snitch;
 
     AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, 
IEndpointSnitch snitch, Map<String, String> configOptions)
     {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 Fri Dec 31 18:28:00 2010
@@ -40,19 +40,23 @@ import org.apache.cassandra.utils.FBUtil
 public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements 
ILatencySubscriber, DynamicEndpointSnitchMBean
 {
     private static final int UPDATES_PER_INTERVAL = 10000;
-    private static final int UPDATE_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicUpdateInterval();
-    private static final int RESET_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicResetInterval();
-    private static final double BADNESS_THRESHOLD = 
DatabaseDescriptor.getDynamicBadnessThreshold();
     private static final int WINDOW_SIZE = 100;
+
+    private int UPDATE_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicUpdateInterval();
+    private int RESET_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicResetInterval();
+    private double BADNESS_THRESHOLD = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+    private String mbeanName;
     private boolean registered = false;
 
     private final ConcurrentHashMap<InetAddress, Double> scores = new 
ConcurrentHashMap<InetAddress, Double>();
     private final ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> 
windows = new ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker>();
     private final AtomicInteger intervalupdates = new AtomicInteger(0);
+
     public final IEndpointSnitch subsnitch;
 
     public DynamicEndpointSnitch(IEndpointSnitch snitch)
     {
+        mbeanName = 
"org.apache.cassandra.db:type=DynamicEndpointSnitch,instance="+hashCode();
         subsnitch = snitch;
         Runnable update = new Runnable()
         {
@@ -72,11 +76,28 @@ public class DynamicEndpointSnitch exten
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(update, 
UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
         StorageService.scheduledTasks.scheduleWithFixedDelay(reset, 
RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        registerMBean();
+   }
 
+    private void registerMBean()
+    {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=DynamicEndpointSnitch,instance="+hashCode()));
+            mbs.registerMBean(this, new ObjectName(mbeanName));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void unregisterMBean()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.unregisterMBean(new ObjectName(mbeanName));
         }
         catch (Exception e)
         {
@@ -208,6 +229,25 @@ public class DynamicEndpointSnitch exten
     {
         return scores;
     }
+
+    public int getUpdateInterval()
+    {
+        return UPDATE_INTERVAL_IN_MS;
+    }
+    public int getResetInterval()
+    {
+        return RESET_INTERVAL_IN_MS;
+    }
+    public double getBadnessThreshold()
+    {
+        return BADNESS_THRESHOLD;
+    }
+    public String getSubsnitchClassName()
+    {
+        return subsnitch.getClass().getName();
+    }
+
+
 }
 
 /** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification 
for arbitrary times **/

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
 Fri Dec 31 18:28:00 2010
@@ -24,4 +24,8 @@ import java.util.Map;
 
 public interface DynamicEndpointSnitchMBean {
     public Map<InetAddress, Double> getScores();
+    public int getUpdateInterval();
+    public int getResetInterval();
+    public double getBadnessThreshold();
+    public String getSubsnitchClassName();
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 Fri Dec 31 18:28:00 2010
@@ -27,11 +27,13 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.*;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -49,9 +51,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -2013,4 +2012,28 @@ public class StorageService implements I
         return Collections.unmodifiableList(tableslist);
     }
 
+    public void updateSnitch(String epSnitchClassName, Boolean dynamic, 
Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double 
dynamicBadnessThreshold) throws ConfigurationException
+    {
+        IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
+
+        // new snitch registers mbean during construction
+        IEndpointSnitch newSnitch = FBUtilities.construct(epSnitchClassName, 
"snitch");
+        if (dynamic)
+        {
+            DatabaseDescriptor.setDynamicUpdateInterval(dynamicUpdateInterval);
+            DatabaseDescriptor.setDynamicResetInterval(dynamicResetInterval);
+            
DatabaseDescriptor.setDynamicBadnessThreshold(dynamicBadnessThreshold);
+            newSnitch = new DynamicEndpointSnitch(newSnitch);
+        }
+
+        // point snitch references to the new instance
+        DatabaseDescriptor.setEndpointSnitch(newSnitch);
+        for (String ks : DatabaseDescriptor.getTables())
+        {
+            Table.open(ks).getReplicationStrategy().snitch = newSnitch;
+        }
+
+        if (oldSnitch instanceof DynamicEndpointSnitch)
+            ((DynamicEndpointSnitch)oldSnitch).unregisterMBean();
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1054156&r1=1054155&r2=1054156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Fri Dec 31 18:28:00 2010
@@ -259,4 +259,15 @@ public interface StorageServiceMBean
     public Map<Token, Float> getOwnership();
 
     public List<String> getKeyspaces();
+
+    /**
+     * Change endpointsnitch class and dynamic-ness (and dynamic attributes) 
at runtime
+     * @param epSnitchClassName        the canonical path name for a class 
implementing IEndpointSnitch
+     * @param dynamic                  boolean that decides whether 
dynamicsnitch is used or not
+     * @param dynamicUpdateInterval    integer, in ms (default 100)
+     * @param dynamicResetInterval     integer, in ms (default 600,000)
+     * @param dynamicBadnessThreshold  double, (default 0.0)
+     * @throws ConfigurationException  classname not found on classpath
+     */
+    public void updateSnitch(String epSnitchClassName, Boolean dynamic, 
Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double 
dynamicBadnessThreshold) throws ConfigurationException;
 }


Reply via email to