Repository: cassandra Updated Branches: refs/heads/trunk 21b04cf31 -> c0c513b64
Expose phi values from failure detector via JMX patch by Ron Kuris and Ariel Weisberg; reviewed by Ariel Weisberg and Brandon Williams for CASSANDRA-9526 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cd75001 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cd75001 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cd75001 Branch: refs/heads/trunk Commit: 3cd750012e0cf3b55442c036627fb032c29c16bc Parents: 0bfa26d Author: Ron Kuris <[email protected]> Authored: Mon Oct 19 15:50:58 2015 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Mon Oct 19 15:50:58 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/gms/FailureDetector.java | 82 +++++++++++++++----- .../cassandra/gms/FailureDetectorMBean.java | 5 ++ .../org/apache/cassandra/tools/NodeProbe.java | 15 +++- .../org/apache/cassandra/tools/NodeTool.java | 3 +- .../tools/nodetool/FailureDetectorInfo.java | 46 +++++++++++ 6 files changed, 132 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0904559..3eff22c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.2.4 + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) * Fix RangeNamesQueryPager (CASSANDRA-10509) * Deprecate Pig support (CASSANDRA-10542) * Reduce contention getting instances of CompositeType (CASSANDRA-10433) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 861a853..c563872 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -22,10 +22,13 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +49,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public static final String MBEAN_NAME = "org.apache.cassandra.net:type=FailureDetector"; private static final int SAMPLE_SIZE = 1000; protected static final long INITIAL_VALUE_NANOS = TimeUnit.NANOSECONDS.convert(getInitialValue(), TimeUnit.MILLISECONDS); + private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage of the max, log a debug message private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause(); private long lastInterpret = System.nanoTime(); @@ -71,8 +75,8 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean // change. private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434... - private final Map<InetAddress, ArrivalWindow> arrivalSamples = new Hashtable<InetAddress, ArrivalWindow>(); - private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<IFailureDetectionEventListener>(); + private final ConcurrentHashMap<InetAddress, ArrivalWindow> arrivalSamples = new ConcurrentHashMap<>(); + private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<>(); public FailureDetector() { @@ -148,6 +152,34 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean return count; } + @Override + public TabularData getPhiValues() throws OpenDataException + { + final CompositeType ct = new CompositeType("Node", "Node", + new String[]{"Endpoint", "PHI"}, + new String[]{"IP of the endpoint", "PHI value"}, + new OpenType[]{SimpleType.STRING, SimpleType.DOUBLE}); + final TabularDataSupport results = new TabularDataSupport(new TabularType("PhiList", "PhiList", ct, new String[]{"Endpoint"})); + + for (final Map.Entry<InetAddress, ArrivalWindow> entry : arrivalSamples.entrySet()) + { + final ArrivalWindow window = entry.getValue(); + if (window.mean() > 0) + { + final double phi = window.getLastReportedPhi(); + if (phi != Double.MIN_VALUE) + { + // returned values are scaled by PHI_FACTOR so that the are on the same scale as PhiConvictThreshold + final CompositeData data = new CompositeDataSupport(ct, + new String[]{"Endpoint", "PHI"}, + new Object[]{entry.getKey().toString(), phi * PHI_FACTOR}); + results.put(data); + } + } + } + return results; + } + public String getEndpointState(String address) throws UnknownHostException { StringBuilder sb = new StringBuilder(); @@ -219,8 +251,6 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public void report(InetAddress ep) { - if (logger.isTraceEnabled()) - logger.trace("reporting {}", ep); long now = System.nanoTime(); ArrivalWindow heartbeatWindow = arrivalSamples.get(ep); if (heartbeatWindow == null) @@ -228,12 +258,17 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean // avoid adding an empty ArrivalWindow to the Map heartbeatWindow = new ArrivalWindow(SAMPLE_SIZE); heartbeatWindow.add(now, ep); - arrivalSamples.put(ep, heartbeatWindow); + heartbeatWindow = arrivalSamples.putIfAbsent(ep, heartbeatWindow); + if (heartbeatWindow != null) + heartbeatWindow.add(now, ep); } else { heartbeatWindow.add(now, ep); } + + if (logger.isTraceEnabled() && heartbeatWindow != null) + logger.info("Average for {} is {}", ep, heartbeatWindow.mean()); } public void interpret(InetAddress ep) @@ -263,13 +298,22 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean if (PHI_FACTOR * phi > getPhiConvictThreshold()) { - logger.trace("notifying listeners that {} is down", ep); - logger.trace("intervals: {} mean: {}", hbWnd, hbWnd.mean()); + if (logger.isTraceEnabled()) + logger.trace("Node {} phi {} > {}; intervals: {} mean: {}", new Object[]{ep, PHI_FACTOR * phi, getPhiConvictThreshold(), hbWnd, hbWnd.mean()}); for (IFailureDetectionEventListener listener : fdEvntListeners) { listener.convict(ep, phi); } } + else if (logger.isDebugEnabled() && (PHI_FACTOR * phi * DEBUG_PERCENTAGE / 100.0 > getPhiConvictThreshold())) + { + logger.debug("PHI for {} : {}", ep, phi); + } + else if (logger.isTraceEnabled()) + { + logger.trace("PHI for {} : {}", ep, phi); + logger.trace("mean for {} : {}", ep, hbWnd.mean()); + } } public void forceConviction(InetAddress ep) @@ -312,10 +356,6 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean sb.append("-----------------------------------------------------------------------"); return sb.toString(); } - - public static void main(String[] args) - { - } } /* @@ -372,12 +412,7 @@ class ArrivalWindow private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class); private long tLast = 0L; private final ArrayBackedBoundedStats arrivalIntervals; - - // this is useless except to provide backwards compatibility in phi_convict_threshold, - // because everyone seems pretty accustomed to the default of 8, and users who have - // already tuned their phi_convict_threshold for their own environments won't need to - // change. - private final double PHI_FACTOR = 1.0 / Math.log(10.0); + private double lastReportedPhi = Double.MIN_VALUE; // in the event of a long partition, never record an interval longer than the rpc timeout, // since if a host is regularly experiencing connectivity problems lasting this long we'd @@ -411,9 +446,14 @@ class ArrivalWindow { long interArrivalTime = (value - tLast); if (interArrivalTime <= MAX_INTERVAL_IN_NANO) + { arrivalIntervals.add(interArrivalTime); + logger.trace("Reporting interval time of {} for {}", interArrivalTime, ep); + } else + { logger.debug("Ignoring interval time of {} for {}", interArrivalTime, ep); + } } else { @@ -435,7 +475,13 @@ class ArrivalWindow { assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called before any samples arrive long t = tnow - tLast; - return t / mean(); + lastReportedPhi = t / mean(); + return lastReportedPhi; + } + + double getLastReportedPhi() + { + return lastReportedPhi; } public String toString() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java index 45250b4..23fae3a 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java +++ b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java @@ -20,6 +20,9 @@ package org.apache.cassandra.gms; import java.net.UnknownHostException; import java.util.Map; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; + public interface FailureDetectorMBean { public void dumpInterArrivalTimes(); @@ -37,4 +40,6 @@ public interface FailureDetectorMBean public int getDownEndpointCount(); public int getUpEndpointCount(); + + public TabularData getPhiValues() throws OpenDataException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index da403ab..62795b5 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -27,7 +27,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMISocketFactory; -import java.text.SimpleDateFormat; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; @@ -522,7 +521,7 @@ public class NodeProbe implements AutoCloseable /** * Take a snapshot of all column family from different keyspaces. - * + * * @param snapshotName * the name of the snapshot. * @param columnFamilyList @@ -1277,6 +1276,18 @@ public class NodeProbe implements AutoCloseable } } } + + public TabularData getFailureDetectorPhilValues() + { + try + { + return fdProxy.getPhiValues(); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index da3560d..175b325 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -129,7 +129,8 @@ public class NodeTool TpStats.class, TopPartitions.class, SetLoggingLevel.class, - GetLoggingLevels.class + GetLoggingLevels.class, + FailureDetectorInfo.class ); Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java new file mode 100644 index 0000000..72c109a --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import io.airlift.command.Command; + +import java.util.List; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "failuredetector", description = "Shows the failure detector information for the cluster") +public class FailureDetectorInfo extends NodeToolCmd +{ + @Override + public void execute(NodeProbe probe) + { + TabularData data = probe.getFailureDetectorPhilValues(); + System.out.printf("%10s,%16s\n", "Endpoint", "Phi"); + for (Object o : data.keySet()) + { + @SuppressWarnings({ "rawtypes", "unchecked" }) + CompositeData datum = data.get(((List) o).toArray(new Object[((List) o).size()])); + System.out.printf("%10s,%16.8f\n",datum.get("Endpoint"), datum.get("PHI")); + } + } +} +
