Author: brandonwilliams Date: Thu Jan 20 22:53:11 2011 New Revision: 1061557
URL: http://svn.apache.org/viewvc?rev=1061557&view=rev Log: Add a configurable maximum amount of time to hint for a dead host. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-1459 Modified: cassandra/branches/cassandra-0.7/conf/cassandra.yaml cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java Modified: cassandra/branches/cassandra-0.7/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/conf/cassandra.yaml?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/conf/cassandra.yaml (original) +++ cassandra/branches/cassandra-0.7/conf/cassandra.yaml Thu Jan 20 22:53:11 2011 @@ -31,6 +31,10 @@ auto_bootstrap: false # See http://wiki.apache.org/cassandra/HintedHandoff hinted_handoff_enabled: true +# this defines the maximum amount of time a dead host will have hints +# generated. After it has been dead this long, hints will be dropped. +# Maximum is approximately 50 days +max_hint_window_in_ms: 2147483647 # authentication backend, implementing IAuthenticator; used to identify users authenticator: org.apache.cassandra.auth.AllowAllAuthenticator Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java Thu Jan 20 22:53:11 2011 @@ -34,6 +34,7 @@ public class Config public Boolean auto_bootstrap = false; public Boolean hinted_handoff_enabled = true; + public Integer max_hint_window_in_ms = Integer.MAX_VALUE; public String[] seeds; public DiskAccessMode disk_access_mode = DiskAccessMode.auto; Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jan 20 22:53:11 2011 @@ -1079,6 +1079,11 @@ public class DatabaseDescriptor return conf.hinted_handoff_enabled; } + public static int getMaxHintWindow() + { + return conf.max_hint_window_in_ms; + } + public static AbstractType getValueValidator(String keyspace, String cf, ByteBuffer column) { return getCFMetaData(keyspace, cf).getValueValidator(column); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jan 20 22:53:11 2011 @@ -128,7 +128,7 @@ public class Gossiper implements IFailur private Set<InetAddress> liveEndpoints_ = new ConcurrentSkipListSet<InetAddress>(inetcomparator); /* unreachable member set */ - private Set<InetAddress> unreachableEndpoints_ = new ConcurrentSkipListSet<InetAddress>(inetcomparator); + private Map<InetAddress, Long> unreachableEndpoints_ = new ConcurrentHashMap<InetAddress, Long>(); /* initial seeds for joining the cluster */ private Set<InetAddress> seeds_ = new ConcurrentSkipListSet<InetAddress>(inetcomparator); @@ -179,7 +179,16 @@ public class Gossiper implements IFailur public Set<InetAddress> getUnreachableMembers() { - return new HashSet<InetAddress>(unreachableEndpoints_); + return unreachableEndpoints_.keySet(); + } + + public long getEndpointDowntime(InetAddress ep) + { + Long downtime = unreachableEndpoints_.get(ep); + if (downtime != null) + return System.currentTimeMillis() - downtime; + else + return 0L; } /** @@ -353,7 +362,7 @@ public class Gossiper implements IFailur double prob = unreachableEndpoints / (liveEndpoints + 1); double randDbl = random_.nextDouble(); if ( randDbl < prob ) - sendGossip(message, unreachableEndpoints_); + sendGossip(message, unreachableEndpoints_.keySet()); } } @@ -735,7 +744,7 @@ public class Gossiper implements IFailur else { liveEndpoints_.remove(addr); - unreachableEndpoints_.add(addr); + unreachableEndpoints_.put(addr, System.currentTimeMillis()); for (IEndpointStateChangeSubscriber subscriber : subscribers_) subscriber.onDead(addr, epState); } @@ -871,7 +880,7 @@ public class Gossiper implements IFailur epState.isAGossiper(true); epState.setHasToken(true); endpointStateMap_.put(ep, epState); - unreachableEndpoints_.add(ep); + unreachableEndpoints_.put(ep, System.currentTimeMillis()); } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Jan 20 22:53:11 2011 @@ -25,6 +25,7 @@ import java.util.*; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.cassandra.gms.Gossiper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +164,12 @@ public abstract class AbstractReplicatio { if (map.containsKey(ep)) continue; + if (!StorageProxy.shouldHint(ep)) + { + if (logger.isDebugEnabled()) + logger.debug("not hinting " + ep + " which has been down " + Gossiper.instance.getEndpointDowntime(ep) + "ms"); + continue; + } InetAddress destination = map.isEmpty() ? localAddress Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Thu Jan 20 22:53:11 2011 @@ -75,6 +75,7 @@ public class StorageProxy implements Sto private static final LatencyTracker rangeStats = new LatencyTracker(); private static final LatencyTracker writeStats = new LatencyTracker(); private static boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled(); + private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow(); private static final String UNREACHABLE = "UNREACHABLE"; private StorageProxy() {} @@ -182,7 +183,7 @@ public class StorageProxy implements Sto } } responseHandler.addHintCallback(hintedMessage, destination); - + Multimap<Message, InetAddress> messages = dcMessages.get(dc); if (messages == null) @@ -190,7 +191,7 @@ public class StorageProxy implements Sto messages = HashMultimap.create(); dcMessages.put(dc, messages); } - + messages.put(hintedMessage, destination); } } @@ -803,6 +804,21 @@ public class StorageProxy implements Sto return hintedHandoffEnabled; } + public int getMaxHintWindow() + { + return maxHintWindow; + } + + public void setMaxHintWindow(int ms) + { + maxHintWindow = ms; + } + + public static boolean shouldHint(InetAddress ep) + { + return Gossiper.instance.getEndpointDowntime(ep) <= maxHintWindow; + } + /** * Performs the truncate operatoin, which effectively deletes all data from * the column family cfname Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1061557&r1=1061556&r2=1061557&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java Thu Jan 20 22:53:11 2011 @@ -40,4 +40,6 @@ public interface StorageProxyMBean public boolean getHintedHandoffEnabled(); public void setHintedHandoffEnabled(boolean b); + public int getMaxHintWindow(); + public void setMaxHintWindow(int ms); }
