Fix truncate to not abort due to unreachable fat clients patch by Oleg Anastasyev; reviewed by jbellis for CASSANDRA-6864
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3050134c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3050134c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3050134c Branch: refs/heads/cassandra-2.1 Commit: 3050134c263f830240e3e9f56b675c3251e312ba Parents: 22b9453 Author: Jonathan Ellis <[email protected]> Authored: Fri Mar 14 14:59:16 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri Mar 14 14:59:16 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 18 ++++++++++++++++++ .../apache/cassandra/service/StorageProxy.java | 8 ++++---- 3 files changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3050134c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4c4be3b..045e4f8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.7 + * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864) * Fix schema concurrency exceptions (CASSANDRA-6841) * Fix leaking validator FH in StreamWriter (CASSANDRA-6832) * Fix saving triggers to schema (CASSANDRA-6789) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3050134c/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 1c17c97..cbd78db 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -252,11 +252,29 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return tokenOwners; } + /** + * @return a list of unreachable gossip participants, including fat clients + */ public Set<InetAddress> getUnreachableMembers() { return unreachableEndpoints.keySet(); } + /** + * @return a list of unreachable token owners + */ + public Set<InetAddress> getUnreachableTokenOwners() + { + Set<InetAddress> tokenOwners = new HashSet<>(); + for (InetAddress endpoint : unreachableEndpoints.keySet()) + { + if (StorageService.instance.getTokenMetadata().isMember(endpoint)) + tokenOwners.add(endpoint); + } + + return tokenOwners; + } + public long getEndpointDowntime(InetAddress ep) { Long downtime = unreachableEndpoints.get(ep); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3050134c/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index a6db9cd..a5542e6 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1822,7 +1822,7 @@ public class StorageProxy implements StorageProxyMBean public static void truncateBlocking(String keyspace, String cfname) throws UnavailableException, TimeoutException, IOException { logger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname); - if (isAnyHostDown()) + if (isAnyStorageHostDown()) { logger.info("Cannot perform truncate, some hosts are down"); // Since the truncate operation is so aggressive and is typically only @@ -1860,11 +1860,11 @@ public class StorageProxy implements StorageProxyMBean * Asks the gossiper if there are any nodes that are currently down. * @return true if the gossiper thinks all nodes are up. */ - private static boolean isAnyHostDown() + private static boolean isAnyStorageHostDown() { - return !Gossiper.instance.getUnreachableMembers().isEmpty(); + return !Gossiper.instance.getUnreachableTokenOwners().isEmpty(); } - + public interface WritePerformer { public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException;
