Updated Branches: refs/heads/trunk c33ccd9e3 -> bdf08364a
Improve Dsnitch Severity with iowait monitor patch by Vijay; reviewed by jbellis for CASSANDRA-5521 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdf08364 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdf08364 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdf08364 Branch: refs/heads/trunk Commit: bdf08364a60d9b05d0b45cb47ddd73adb08b4196 Parents: c33ccd9 Author: Vijay Parthasarathy <[email protected]> Authored: Fri May 3 02:03:10 2013 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Fri May 3 02:03:10 2013 -0700 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionInfo.java | 3 +- .../apache/cassandra/service/StorageService.java | 18 +-- .../cassandra/utils/BackgroundActivityMonitor.java | 149 +++++++++++++++ 3 files changed, 156 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index da67bab..d086eef 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -139,8 +139,7 @@ public final class CompactionInfo implements Serializable public void started() { reportedSeverity = getCompactionInfo().getTotal() / load; - if (!StorageService.instance.reportSeverity(reportedSeverity)) - reportedSeverity = 0d; + StorageService.instance.reportSeverity(reportedSeverity); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a86d856..3c508ee 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -186,6 +186,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>(); + private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor(); + private final ObjectName jmxObjectName; public void finishBootstrapping() @@ -927,24 +929,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Gossip about the known severity of the events in this node + * Increment about the known Compaction severity of the events in this node */ - public boolean reportSeverity(double incr) + public void reportSeverity(double incr) { - if (!Gossiper.instance.isEnabled()) - return false; - VersionedValue updated = StorageService.instance.valueFactory.severity(severity.addAndGet(incr)); - Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated); - return true; + bgMonitor.incrCompactionSeverity(incr); } public double getSeverity(InetAddress endpoint) { - VersionedValue event; - EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null) - return Double.parseDouble(event.value); - return 0.0; + return bgMonitor.getSeverity(endpoint); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java new file mode 100644 index 0000000..10215a4 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java @@ -0,0 +1,149 @@ +package org.apache.cassandra.utils; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.util.StringTokenizer; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.service.StorageService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.AtomicDouble; + +public class BackgroundActivityMonitor +{ + private static final Logger logger = LoggerFactory.getLogger(BackgroundActivityMonitor.class); + + public static final int USER_INDEX = 0; + public static final int NICE_INDEX = 1; + public static final int SYS_INDEX = 2; + public static final int IDLE_INDEX = 3; + public static final int IOWAIT_INDEX = 4; + public static final int IRQ_INDEX = 5; + public static final int SOFTIRQ_INDEX = 6; + + private static final String OPERATING_SYSTEM = System.getProperty("os.name").toLowerCase(); + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + private static final String PROC_STAT_PATH = "/proc/stat"; + + private final AtomicDouble compaction_severity = new AtomicDouble(); + private final ScheduledExecutorService reportThread = new DebuggableScheduledThreadPoolExecutor("Background_Reporter"); + + private RandomAccessFile statsFile; + private long[] lastReading; + + public BackgroundActivityMonitor() + { + try + { + statsFile = new RandomAccessFile(PROC_STAT_PATH, "r"); + lastReading = readAndCompute(); + } + catch (IOException ex) + { + if (isUnix()) + logger.warn("Couldn't open /proc/stats"); + statsFile = null; + } + reportThread.scheduleAtFixedRate(new BackgroundActivityReporter(), 1, 1, TimeUnit.SECONDS); + } + + public static boolean isUnix() + { + return OPERATING_SYSTEM.contains("nix") || OPERATING_SYSTEM.contains("nux") || OPERATING_SYSTEM.contains("aix"); + } + + private long[] readAndCompute() throws IOException + { + statsFile.seek(0); + StringTokenizer tokenizer = new StringTokenizer(statsFile.readLine()); + String name = tokenizer.nextToken(); + assert name.equalsIgnoreCase("cpu"); + long[] returned = new long[tokenizer.countTokens()]; + for (int i = 0; i < tokenizer.countTokens(); i++) + returned[i] = Long.valueOf(tokenizer.nextToken()); + return returned; + } + + private float compareAtIndex(long[] reading1, long[] reading2, int index) + { + long total1 = 0, total2 = 0; + for (int i = 0; i <= SOFTIRQ_INDEX; i++) + { + total1 += reading1[i]; + total2 += reading2[i]; + } + float totalDiff = total2 - total1; + + long intrested1 = reading1[index], intrested2 = reading2[index]; + float diff = intrested2 - intrested1; + if (diff == 0) + return 0f; + return (diff / totalDiff) * 100; // yes it is hard coded to 100 [update + // unit?] + } + + public void incrCompactionSeverity(double sev) + { + compaction_severity.addAndGet(sev); + } + + public double getIOWait() throws IOException + { + if (statsFile == null) + return -1d; + long[] newComp = readAndCompute(); + double value = compareAtIndex(lastReading, newComp, IOWAIT_INDEX); + lastReading = newComp; + return value; + } + + public double getNormalizedLoadAvg() + { + double avg = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); + return avg / NUM_CPUS; + } + + public double getSeverity(InetAddress endpoint) + { + VersionedValue event; + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null) + return Double.parseDouble(event.value); + return 0.0; + } + + public class BackgroundActivityReporter implements Runnable + { + public void run() + { + double report = -1; + try + { + report = getIOWait(); + } + catch (IOException e) + { + // ignore; + if (isUnix()) + logger.warn("Couldn't read /proc/stats"); + } + if (report == -1d) + report = compaction_severity.get(); + + if (!Gossiper.instance.isEnabled()) + return; + VersionedValue updated = StorageService.instance.valueFactory.severity(report); + Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated); + } + } +}
