Dsnitch uses 'severity', latency, and time since last reply for scores. Patch by vijay and brandonwilliams, reviewed by vijay for CASSANDRA-3722
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98a70bde Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98a70bde Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98a70bde Branch: refs/heads/trunk Commit: 98a70bdebb26955ea629fe3a21212253fc7b7d17 Parents: 08345fa Author: Brandon Williams <[email protected]> Authored: Fri Mar 30 16:44:35 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Fri Mar 30 18:15:50 2012 -0500 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionInfo.java | 21 +++++++ .../cassandra/db/compaction/CompactionManager.java | 4 ++ .../org/apache/cassandra/gms/ApplicationState.java | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 2 +- .../org/apache/cassandra/gms/VersionedValue.java | 5 ++ .../cassandra/locator/DynamicEndpointSnitch.java | 43 ++++++++++++++- .../locator/DynamicEndpointSnitchMBean.java | 7 +++ .../org/apache/cassandra/net/MessagingService.java | 6 ++ .../apache/cassandra/service/StorageService.java | 23 ++++++++- .../locator/DynamicEndpointSnitchTest.java | 11 ++++- 10 files changed, 117 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index fda8dd3..594b639 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -21,6 +21,8 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import org.apache.cassandra.service.StorageService; + /** Implements serializable to allow structured info to be returned via JMX. */ public final class CompactionInfo implements Serializable { @@ -103,6 +105,8 @@ public final class CompactionInfo implements Serializable { private volatile boolean isStopped = false; public abstract CompactionInfo getCompactionInfo(); + double load = StorageService.instance.getLoad(); + boolean reportedSeverity = false; public void stop() { @@ -113,5 +117,22 @@ public final class CompactionInfo implements Serializable { return isStopped; } + /** + * report event on the size of the compaction. + */ + public void started() + { + reportedSeverity = StorageService.instance.reportSeverity(getCompactionInfo().getTotalBytes()/load); + } + + /** + * remove the event complete + */ + public void finished() + { + if (reportedSeverity) + StorageService.instance.reportSeverity(-(getCompactionInfo().getTotalBytes()/load)); + reportedSeverity = false; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 848abee..36f05c1 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1017,11 +1017,15 @@ public class CompactionManager implements CompactionManagerMBean public void beginCompaction(CompactionInfo.Holder ci) { + // notify + ci.started(); compactions.add(ci); } public void finishCompaction(CompactionInfo.Holder ci) { + // notify + ci.finished(); compactions.remove(ci); totalBytesCompacted += ci.getCompactionInfo().getTotalBytes(); totalCompactionsCompleted += 1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/ApplicationState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index 48e8d84..4520426 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -28,6 +28,7 @@ public enum ApplicationState REMOVAL_COORDINATOR, INTERNAL_IP, RPC_ADDRESS, + SEVERITY, // pad to allow adding new states to existing cluster X1, X2, http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index e6a2a3c..2a948a6 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -1160,7 +1160,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public boolean isEnabled() { - return !scheduledGossipTask.isCancelled(); + return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index eccca1f..36ff1d9 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -178,6 +178,11 @@ public class VersionedValue implements Comparable<VersionedValue> { return new VersionedValue(private_ip); } + + public VersionedValue severity(double value) + { + return new VersionedValue(String.valueOf(value)); + } } private static class VersionedValueSerializer implements IVersionedSerializer<VersionedValue> http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 0f3aaca..3b80e67 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -49,6 +49,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private boolean registered = false; private final ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap<InetAddress, Double>(); + private final ConcurrentHashMap<InetAddress, Long> lastReceived = new ConcurrentHashMap<InetAddress, Long>(); private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = new ConcurrentHashMap<InetAddress, BoundedStatsDeque>(); private final AtomicInteger intervalupdates = new AtomicInteger(0); @@ -56,7 +57,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public DynamicEndpointSnitch(IEndpointSnitch snitch) { - mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch,instance="+hashCode(); + this(snitch, null); + } + public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) + { + mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch"; + if (instance != null) + mbeanName += ",instance=" + instance; subsnitch = snitch; Runnable update = new Runnable() { @@ -195,12 +202,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public void receiveTiming(InetAddress host, Double latency) // this is cheap { + lastReceived.put(host, System.currentTimeMillis()); if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL) return; BoundedStatsDeque deque = windows.get(host); if (deque == null) { - BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE); + BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE); deque = windows.putIfAbsent(host, maybeNewDeque); if (deque == null) deque = maybeNewDeque; @@ -222,13 +230,32 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } } + double maxLatency = 1; + long maxPenalty = 1; + HashMap<InetAddress, Long> penalties = new HashMap<InetAddress, Long>(); + for (Map.Entry<InetAddress, BoundedStatsDeque> entry : windows.entrySet()) + { + double mean = entry.getValue().mean(); + if (mean > maxLatency) + maxLatency = mean; + long timePenalty = lastReceived.containsKey(entry.getKey()) ? lastReceived.get(entry.getKey()) : System.currentTimeMillis(); + timePenalty = System.currentTimeMillis() - timePenalty; + timePenalty = timePenalty > UPDATE_INTERVAL_IN_MS ? UPDATE_INTERVAL_IN_MS : timePenalty; + penalties.put(entry.getKey(), timePenalty); + if (timePenalty > maxPenalty) + maxPenalty = timePenalty; + } for (Map.Entry<InetAddress, BoundedStatsDeque> entry: windows.entrySet()) { - scores.put(entry.getKey(), entry.getValue().mean()); + double score = entry.getValue().mean() / maxLatency; + score += penalties.get(entry.getKey()) / maxPenalty; + score += StorageService.instance.getSeverity(entry.getKey()); + scores.put(entry.getKey(), score); } intervalupdates.set(0); } + private void reset() { for (BoundedStatsDeque deque : windows.values()) @@ -274,4 +301,14 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return timings; } + public void setSeverity(double severity) + { + StorageService.instance.reportSeverity(severity); + } + + public double getSeverity() + { + return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress()); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java index 7c423c8..becbacf 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java @@ -29,4 +29,11 @@ public interface DynamicEndpointSnitchMBean { public double getBadnessThreshold(); public String getSubsnitchClassName(); public List<Double> dumpTimings(String hostname) throws UnknownHostException; + /** + * Use this if you want to specify a severity it can be -ve + * Example: Page cache is cold and you want data to be sent + * though it is not preferred one. + */ + public void setSeverity(double severity); + public double getSeverity(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index f5379de..f3e2600 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -715,6 +715,12 @@ public final class MessagingService implements MessagingServiceMBean return pendingTasks; } + public int getCommandPendingTasks(InetAddress address) + { + OutboundTcpConnectionPool connection = connectionManagers.get(address); + return connection == null ? 0 : connection.cmdCon.getPendingMessages(); + } + public Map<String, Long> getCommandCompletedTasks() { Map<String, Long> completedTasks = new HashMap<String, Long>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4e43964..4b12383 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -48,7 +48,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.*; -import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.SSTableLoader; @@ -785,6 +784,28 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } /** + * Gossip about the known severity of the events in this node + */ + public synchronized boolean reportSeverity(double incr) + { + if (!Gossiper.instance.isEnabled()) + return false; + double update = getSeverity(FBUtilities.getBroadcastAddress()) + incr; + VersionedValue updated = StorageService.instance.valueFactory.severity(update); + Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated); + return true; + } + + public double getSeverity(InetAddress endpoint) + { + VersionedValue event; + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null) + return Double.parseDouble(event.value); + return 0.0; + } + + /** * for a keyspace, return the ranges and corresponding listen addresses. * @param keyspace * @return http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java index 7d44f27..decd59a 100644 --- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java @@ -37,7 +37,8 @@ public class DynamicEndpointSnitchTest // do this because SS needs to be initialized before DES can work properly. StorageService.instance.initClient(0); int sleeptime = 150; - DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch()); + SimpleSnitch ss = new SimpleSnitch(); + DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); InetAddress self = FBUtilities.getBroadcastAddress(); ArrayList<InetAddress> order = new ArrayList<InetAddress>(); InetAddress host1 = InetAddress.getByName("127.0.0.1"); @@ -61,6 +62,8 @@ public class DynamicEndpointSnitchTest // make host1 a little worse dsnitch.receiveTiming(host1, 2.0); + dsnitch.receiveTiming(host2, 1.0); + dsnitch.receiveTiming(host3, 1.0); Thread.sleep(sleeptime); order.clear(); @@ -71,6 +74,8 @@ public class DynamicEndpointSnitchTest // make host2 as bad as host1 dsnitch.receiveTiming(host2, 2.0); + dsnitch.receiveTiming(host1, 1.0); + dsnitch.receiveTiming(host3, 1.0); Thread.sleep(sleeptime); order.clear(); @@ -82,6 +87,8 @@ public class DynamicEndpointSnitchTest // make host3 the worst for (int i = 0; i < 2; i++) { + dsnitch.receiveTiming(host1, 1.0); + dsnitch.receiveTiming(host2, 1.0); dsnitch.receiveTiming(host3, 2.0); } Thread.sleep(sleeptime); @@ -95,6 +102,8 @@ public class DynamicEndpointSnitchTest // make host3 equal to the others for (int i = 0; i < 2; i++) { + dsnitch.receiveTiming(host1, 1.0); + dsnitch.receiveTiming(host2, 1.0); dsnitch.receiveTiming(host3, 1.0); } Thread.sleep(sleeptime);
