Repository: cassandra Updated Branches: refs/heads/trunk 221f27a6d -> 3472cf06c
Allow updating DynamicEndpointSnitch properties via JMX patch by sankalp kohli; reviewed by Robert Stupp for CASSANDRA-12179 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3472cf06 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3472cf06 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3472cf06 Branch: refs/heads/trunk Commit: 3472cf06ce9a811381a1548934b7a0b343616197 Parents: 221f27a Author: sankalp kohli <[email protected]> Authored: Thu Aug 4 20:37:36 2016 +0200 Committer: Robert Stupp <[email protected]> Committed: Thu Aug 4 20:37:36 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 6 +- .../cassandra/config/DatabaseDescriptor.java | 6 +- .../locator/DynamicEndpointSnitch.java | 68 ++++++++++---- .../cassandra/service/StorageService.java | 93 +++++++++++++++----- .../cassandra/service/StorageServiceMBean.java | 28 ++++-- 6 files changed, 153 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 98f9c24..c7063d9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) * Collect metrics on queries by consistency level (CASSANDRA-7384) * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 64f06b9..5345a57 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -210,9 +210,9 @@ public class Config public String endpoint_snitch; public Boolean dynamic_snitch = true; - public Integer dynamic_snitch_update_interval_in_ms = 100; - public Integer dynamic_snitch_reset_interval_in_ms = 600000; - public Double dynamic_snitch_badness_threshold = 0.1; + public volatile Integer dynamic_snitch_update_interval_in_ms = 100; + public volatile Integer dynamic_snitch_reset_interval_in_ms = 600000; + public volatile Double dynamic_snitch_badness_threshold = 0.1; public String request_scheduler; public RequestSchedulerId request_scheduler_id; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 15aee59..3f367b4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -477,7 +477,7 @@ public class DatabaseDescriptor { throw new ConfigurationException("Missing endpoint_snitch directive", false); } - snitch = createEndpointSnitch(conf.endpoint_snitch); + snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); EndpointSnitchInfo.create(); localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); @@ -858,12 +858,12 @@ public class DatabaseDescriptor } } - private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException + public static IEndpointSnitch createEndpointSnitch(boolean dynamic, String snitchClassName) throws ConfigurationException { if (!snitchClassName.contains(".")) snitchClassName = "org.apache.cassandra.locator." + snitchClassName; IEndpointSnitch snitch = FBUtilities.construct(snitchClassName, "snitch"); - return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch; + return dynamic ? new DynamicEndpointSnitch(snitch) : snitch; } public static IAuthenticator getAuthenticator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 975b10e..de0cdde 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import com.codahale.metrics.ExponentiallyDecayingReservoir; @@ -50,13 +51,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values private static final int WINDOW_SIZE = 100; - private final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); - private final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); - private final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); + private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); + private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); + private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to // warrant not merging two ranges into a single range - private double RANGE_MERGING_PREFERENCE = 1.5; + private static final double RANGE_MERGING_PREFERENCE = 1.5; private String mbeanName; private boolean registered = false; @@ -66,24 +67,31 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public final IEndpointSnitch subsnitch; + private volatile ScheduledFuture<?> updateSchedular; + private volatile ScheduledFuture<?> resetSchedular; + + private final Runnable update; + private final Runnable reset; + public DynamicEndpointSnitch(IEndpointSnitch snitch) { this(snitch, null); } + public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) { mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch"; if (instance != null) mbeanName += ",instance=" + instance; subsnitch = snitch; - Runnable update = new Runnable() + update = new Runnable() { public void run() { updateScores(); } }; - Runnable reset = new Runnable() + reset = new Runnable() { public void run() { @@ -92,10 +100,33 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa reset(); } }; - ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); - ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS); + resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS); registerMBean(); - } + } + + /** + * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler and reset-scheduler tasks + * if the configured rates for these tasks have changed. + */ + public void applyConfigChanges() + { + if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval()) + { + dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); + updateSchedular.cancel(false); + updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS); + } + + if (dynamicResetInterval != DatabaseDescriptor.getDynamicResetInterval()) + { + dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); + resetSchedular.cancel(false); + resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS); + } + + dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); + } private void registerMBean() { @@ -110,8 +141,11 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } } - public void unregisterMBean() + public void close() { + updateSchedular.cancel(false); + resetSchedular.cancel(false); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { @@ -150,7 +184,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public void sortByProximity(final InetAddress address, List<InetAddress> addresses) { assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself - if (BADNESS_THRESHOLD == 0) + if (dynamicBadnessThreshold == 0) { sortByProximityWithScore(address, addresses); } @@ -194,7 +228,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } // Sort the scores and then compare them (positionally) to the scores in the subsnitch order. - // If any of the subsnitch-ordered scores exceed the optimal/sorted score by BADNESS_THRESHOLD, use + // If any of the subsnitch-ordered scores exceed the optimal/sorted score by dynamicBadnessThreshold, use // the score-sorted ordering instead of the subsnitch ordering. ArrayList<Double> sortedScores = new ArrayList<>(subsnitchOrderedScores); Collections.sort(sortedScores); @@ -202,7 +236,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa Iterator<Double> sortedScoreIterator = sortedScores.iterator(); for (Double subsnitchScore : subsnitchOrderedScores) { - if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + BADNESS_THRESHOLD))) + if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold))) { sortByProximityWithScore(address, addresses); return; @@ -306,15 +340,15 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public int getUpdateInterval() { - return UPDATE_INTERVAL_IN_MS; + return dynamicUpdateInterval; } public int getResetInterval() { - return RESET_INTERVAL_IN_MS; + return dynamicResetInterval; } public double getBadnessThreshold() { - return BADNESS_THRESHOLD; + return dynamicBadnessThreshold; } public String getSubsnitchClassName() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 6373df2..eade850 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4434,37 +4434,88 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Collections.unmodifiableMap(result); } - public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException + public void setDynamicUpdateInterval(int dynamicUpdateInterval) { - IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch(); - - // new snitch registers mbean during construction - IEndpointSnitch newSnitch; - try + if (DatabaseDescriptor.getEndpointSnitch() instanceof DynamicEndpointSnitch) { - newSnitch = FBUtilities.construct(epSnitchClassName, "snitch"); - } - catch (ConfigurationException e) - { - throw new ClassNotFoundException(e.getMessage()); + + try + { + updateSnitch(null, true, dynamicUpdateInterval, null, null); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } } - if (dynamic) - { + } + + public int getDynamicUpdateInterval() + { + return DatabaseDescriptor.getDynamicUpdateInterval(); + } + + public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException + { + // apply dynamic snitch configuration + if (dynamicUpdateInterval != null) DatabaseDescriptor.setDynamicUpdateInterval(dynamicUpdateInterval); + if (dynamicResetInterval != null) DatabaseDescriptor.setDynamicResetInterval(dynamicResetInterval); + if (dynamicBadnessThreshold != null) DatabaseDescriptor.setDynamicBadnessThreshold(dynamicBadnessThreshold); - newSnitch = new DynamicEndpointSnitch(newSnitch); - } - // point snitch references to the new instance - DatabaseDescriptor.setEndpointSnitch(newSnitch); - for (String ks : Schema.instance.getKeyspaces()) + IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch(); + + // new snitch registers mbean during construction + if(epSnitchClassName != null) { - Keyspace.open(ks).getReplicationStrategy().snitch = newSnitch; + + // need to unregister the mbean _before_ the new dynamic snitch is instantiated (and implicitly initialized + // and its mbean registered) + if (oldSnitch instanceof DynamicEndpointSnitch) + ((DynamicEndpointSnitch)oldSnitch).close(); + + IEndpointSnitch newSnitch; + try + { + newSnitch = DatabaseDescriptor.createEndpointSnitch(dynamic != null && dynamic, epSnitchClassName); + } + catch (ConfigurationException e) + { + throw new ClassNotFoundException(e.getMessage()); + } + + if (newSnitch instanceof DynamicEndpointSnitch) + { + logger.info("Created new dynamic snitch {} with update-interval={}, reset-interval={}, badness-threshold={}", + ((DynamicEndpointSnitch)newSnitch).subsnitch.getClass().getName(), DatabaseDescriptor.getDynamicUpdateInterval(), + DatabaseDescriptor.getDynamicResetInterval(), DatabaseDescriptor.getDynamicBadnessThreshold()); + } + else + { + logger.info("Created new non-dynamic snitch {}", newSnitch.getClass().getName()); + } + + // point snitch references to the new instance + DatabaseDescriptor.setEndpointSnitch(newSnitch); + for (String ks : Schema.instance.getKeyspaces()) + { + Keyspace.open(ks).getReplicationStrategy().snitch = newSnitch; + } } + else + { + if (oldSnitch instanceof DynamicEndpointSnitch) + { + logger.info("Applying config change to dynamic snitch {} with update-interval={}, reset-interval={}, badness-threshold={}", + ((DynamicEndpointSnitch)oldSnitch).subsnitch.getClass().getName(), DatabaseDescriptor.getDynamicUpdateInterval(), + DatabaseDescriptor.getDynamicResetInterval(), DatabaseDescriptor.getDynamicBadnessThreshold()); - if (oldSnitch instanceof DynamicEndpointSnitch) - ((DynamicEndpointSnitch)oldSnitch).unregisterMBean(); + DynamicEndpointSnitch snitch = (DynamicEndpointSnitch)oldSnitch; + snitch.applyConfigChanges(); + } + } updateTopology(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index abb10c1..2e5651a 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -463,15 +463,33 @@ public interface StorageServiceMBean extends NotificationEmitter public Map<String, String> getViewBuildStatuses(String keyspace, String view); /** - * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime + * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime. + * + * This method is used to change the snitch implementation and/or dynamic snitch parameters. + * If {@code epSnitchClassName} is specified, it will configure a new snitch instance and make it a + * 'dynamic snitch' if {@code dynamic} is specified and {@code true}. + * + * The parameters {@code dynamicUpdateInterval}, {@code dynamicResetInterval} and {@code dynamicBadnessThreshold} + * can be specified individually to update the parameters of the dynamic snitch during runtime. + * * @param epSnitchClassName the canonical path name for a class implementing IEndpointSnitch - * @param dynamic boolean that decides whether dynamicsnitch is used or not - * @param dynamicUpdateInterval integer, in ms (default 100) - * @param dynamicResetInterval integer, in ms (default 600,000) - * @param dynamicBadnessThreshold double, (default 0.0) + * @param dynamic boolean that decides whether dynamicsnitch is used or not - only valid, if {@code epSnitchClassName} is specified + * @param dynamicUpdateInterval integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 100) + * @param dynamicResetInterval integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 600,000) + * @param dynamicBadnessThreshold double, (defaults to the value configured in cassandra.yaml, which defaults to 0.0) */ public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException; + /* + Update dynamic_snitch_update_interval_in_ms + */ + public void setDynamicUpdateInterval(int dynamicUpdateInterval); + + /* + Get dynamic_snitch_update_interval_in_ms + */ + public int getDynamicUpdateInterval(); + // allows a user to forcibly 'kill' a sick node public void stopGossiping();
