Repository: cassandra
Updated Branches:
  refs/heads/trunk a8f6a6945 -> 286f6a143


Remove compaction Severity from DynamicEndpointSnitch
patch by jbellis; reviewed by Jeremiah Jordan for CASSANDRA-11738


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/286f6a14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/286f6a14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/286f6a14

Branch: refs/heads/trunk
Commit: 286f6a143573de267c1595fe4dd83108ed5356fc
Parents: a8f6a69
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Fri Jul 22 17:20:29 2016 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Fri Jul 22 17:24:22 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/db/compaction/CompactionInfo.java |  20 ---
 .../locator/DynamicEndpointSnitch.java          |  25 ++-
 .../locator/DynamicEndpointSnitchMBean.java     |  25 ++-
 .../cassandra/metrics/CompactionMetrics.java    |   4 -
 .../cassandra/service/StorageService.java       |  20 ---
 .../utils/BackgroundActivityMonitor.java        | 171 -------------------
 7 files changed, 46 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f71489..efda89e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@
  * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
  * Add supplied username to authentication error messages (CASSANDRA-12076)
  * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+
 
 
 3.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index fe81eac..535217f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -138,8 +138,6 @@ public final class CompactionInfo implements Serializable
     {
         private volatile boolean stopRequested = false;
         public abstract CompactionInfo getCompactionInfo();
-        double load = StorageMetrics.load.getCount();
-        double reportedSeverity = 0d;
 
         public void stop()
         {
@@ -150,23 +148,5 @@ public final class CompactionInfo implements Serializable
         {
             return stopRequested;
         }
-        /**
-         * report event on the size of the compaction.
-         */
-        public void started()
-        {
-            reportedSeverity = getCompactionInfo().getTotal() / load;
-            StorageService.instance.reportSeverity(reportedSeverity);
-        }
-
-        /**
-         * remove the event complete
-         */
-        public void finished()
-        {
-            if (reportedSeverity != 0d)
-                StorageService.instance.reportSeverity(-(reportedSeverity));
-            reportedSeverity = 0d;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 70aecb0..08f6aa6 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
@@ -31,9 +32,14 @@ import javax.management.ObjectName;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.ftpserver.command.impl.STOR;
 
 
 /**
@@ -283,7 +289,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
             // finally, add the severity without any weighting, since hosts 
scale this relative to their own load and the size of the task causing the 
severity.
             // "Severity" is basically a measure of compaction activity 
(CASSANDRA-3722).
             if (USE_SEVERITY)
-                score += StorageService.instance.getSeverity(entry.getKey());
+                score += getSeverity(entry.getKey());
             // lowest score (least amount of badness) wins.
             newScores.put(entry.getKey(), score);
         }
@@ -333,12 +339,25 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
 
     public void setSeverity(double severity)
     {
-        StorageService.instance.reportManualSeverity(severity);
+        Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, 
StorageService.instance.valueFactory.severity(severity));
+    }
+
+    private double getSeverity(InetAddress endpoint)
+    {
+        EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (state == null)
+            return 0.0;
+
+        VersionedValue event = 
state.getApplicationState(ApplicationState.SEVERITY);
+        if (event == null)
+            return 0.0;
+
+        return Double.parseDouble(event.value);
     }
 
     public double getSeverity()
     {
-        return 
StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress());
+        return getSeverity(FBUtilities.getBroadcastAddress());
     }
 
     public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, 
List<InetAddress> l1, List<InetAddress> l2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java 
b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
index a413bc5..552a16d 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
@@ -29,11 +29,30 @@ public interface DynamicEndpointSnitchMBean {
     public double getBadnessThreshold();
     public String getSubsnitchClassName();
     public List<Double> dumpTimings(String hostname) throws 
UnknownHostException;
+
     /**
-     * Use this if you want to specify a severity; it can be negative
-     * Example: Page cache is cold and you want data to be sent 
-     *          though it is not preferred one.
+     * Setting a Severity allows operators to inject preference information 
into the Dynamic Snitch
+     * replica selection.
+     *
+     * When choosing which replicas to participate in a read request, the 
DSnitch sorts replicas
+     * by response latency, and selects the fastest replicas.  Latencies are 
normalized to a score
+     * from 0 to 1,  with lower scores being faster.
+     *
+     * The Severity injected here will be added to the normalized score.
+     *
+     * Thus, adding a Severity greater than 1 will mean the replica will never 
be contacted
+     * (unless needed for ALL or if it is added later for rapid read 
protection).
+     *
+     * Conversely, adding a negative Severity means the replica will *always* 
be contacted.
+     *
+     * (The "Severity" term is historical and dates to when this was used to 
represent how
+     * badly background tasks like compaction were affecting a replica's 
performance.
+     * See CASSANDRA-3722 for when this was introduced and CASSANDRA-11738 for 
why it was removed.)
      */
     public void setSeverity(double severity);
+
+    /**
+     * @return the current manually injected Severity.
+     */
     public double getSeverity();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java 
b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index 9d2863f..2cddfff 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -140,15 +140,11 @@ public class CompactionMetrics implements 
CompactionManager.CompactionExecutorSt
 
     public void beginCompaction(CompactionInfo.Holder ci)
     {
-        // notify
-        ci.started();
         compactions.add(ci);
     }
 
     public void finishCompaction(CompactionInfo.Holder ci)
     {
-        // notify
-        ci.finished();
         compactions.remove(ci);
         bytesCompacted.inc(ci.getCompactionInfo().getTotal());
         totalCompactionsCompleted.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 2883e24..d64fc04 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -189,8 +189,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = 
new CopyOnWriteArrayList<>();
 
-    private static final BackgroundActivityMonitor bgMonitor = new 
BackgroundActivityMonitor();
-
     private final ObjectName jmxObjectName;
 
     private Collection<Token> bootstrapTokens = null;
@@ -1468,24 +1466,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Increment about the known Compaction severity of the events in this node
-     */
-    public void reportSeverity(double incr)
-    {
-        bgMonitor.incrCompactionSeverity(incr);
-    }
-
-    public void reportManualSeverity(double incr)
-    {
-        bgMonitor.incrManualSeverity(incr);
-    }
-
-    public double getSeverity(InetAddress endpoint)
-    {
-        return bgMonitor.getSeverity(endpoint);
-    }
-
-    /**
      * for a keyspace, return the ranges and corresponding listen addresses.
      * @param keyspace
      * @return the endpoint map

http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java 
b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
deleted file mode 100644
index 1799d10..0000000
--- a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package org.apache.cassandra.utils;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.util.StringTokenizer;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class BackgroundActivityMonitor
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(BackgroundActivityMonitor.class);
-
-    public static final int USER_INDEX = 0;
-    public static final int NICE_INDEX = 1;
-    public static final int SYS_INDEX = 2;
-    public static final int IDLE_INDEX = 3;
-    public static final int IOWAIT_INDEX = 4;
-    public static final int IRQ_INDEX = 5;
-    public static final int SOFTIRQ_INDEX = 6;
-
-    private static final int NUM_CPUS = 
Runtime.getRuntime().availableProcessors();
-    private static final String PROC_STAT_PATH = "/proc/stat";
-
-    private final AtomicDouble compaction_severity = new AtomicDouble();
-    private final AtomicDouble manual_severity = new AtomicDouble();
-    private final ScheduledExecutorService reportThread = new 
DebuggableScheduledThreadPoolExecutor("Background_Reporter");
-
-    private RandomAccessFile statsFile;
-    private long[] lastReading;
-
-    public BackgroundActivityMonitor()
-    {
-        try
-        {
-            statsFile = new RandomAccessFile(PROC_STAT_PATH, "r");
-            lastReading = readAndCompute();
-        }
-        catch (IOException ex)
-        {
-            if (FBUtilities.hasProcFS())
-                logger.warn("Couldn't open /proc/stats");
-            statsFile = null;
-        }
-        reportThread.scheduleAtFixedRate(new BackgroundActivityReporter(), 1, 
1, TimeUnit.SECONDS);
-    }
-
-    private long[] readAndCompute() throws IOException
-    {
-        statsFile.seek(0);
-        StringTokenizer tokenizer = new StringTokenizer(statsFile.readLine());
-        String name = tokenizer.nextToken();
-        assert name.equalsIgnoreCase("cpu");
-        long[] returned = new long[tokenizer.countTokens()];
-        for (int i = 0; i < returned.length; i++)
-            returned[i] = Long.parseLong(tokenizer.nextToken());
-        return returned;
-    }
-
-    private float compareAtIndex(long[] reading1, long[] reading2, int index)
-    {
-        long total1 = 0, total2 = 0;
-        for (int i = 0; i <= SOFTIRQ_INDEX; i++)
-        {
-            total1 += reading1[i];
-            total2 += reading2[i];
-        }
-        float totalDiff = total2 - total1;
-
-        long intrested1 = reading1[index], intrested2 = reading2[index];
-        float diff = intrested2 - intrested1;
-        if (diff == 0)
-            return 0f;
-        return (diff / totalDiff) * 100; // yes it is hard coded to 100 [update
-                                         // unit?]
-    }
-
-    public void incrCompactionSeverity(double sev)
-    {
-        compaction_severity.addAndGet(sev);
-    }
-
-    public void incrManualSeverity(double sev)
-    {
-        manual_severity.addAndGet(sev);
-    }
-
-    public double getIOWait() throws IOException
-    {
-        if (statsFile == null)
-            return -1d;
-        long[] newComp = readAndCompute();
-        double value = compareAtIndex(lastReading, newComp, IOWAIT_INDEX);
-        lastReading = newComp;
-        return value;
-    }
-
-    public double getNormalizedLoadAvg()
-    {
-        double avg = 
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
-        return avg / NUM_CPUS;
-    }
-
-    public double getSeverity(InetAddress endpoint)
-    {
-        VersionedValue event;
-        EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state != null && (event = 
state.getApplicationState(ApplicationState.SEVERITY)) != null)
-            return Double.parseDouble(event.value);
-        return 0.0;
-    }
-
-    public class BackgroundActivityReporter implements Runnable
-    {
-        public void run()
-        {
-            double report = -1;
-            try
-            {
-                report = getIOWait();
-            }
-            catch (IOException e)
-            {
-                // ignore;
-                if (FBUtilities.hasProcFS())
-                    logger.warn("Couldn't read /proc/stats");
-            }
-            if (report == -1d)
-                report = compaction_severity.get();
-
-            if (!Gossiper.instance.isEnabled())
-                return;
-            report += manual_severity.get(); // add manual severity setting.
-            VersionedValue updated = 
StorageService.instance.valueFactory.severity(report);
-            
Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
-        }
-    }
-}

Reply via email to