Updated Branches: refs/heads/cassandra-1.1 9a8a8902b -> aa2c28ead refs/heads/trunk 1440079d4 -> 2e7d1f83f
improve "nodetool ring" handling of multi-dc clusters patch by dralves; reviewed by jbellis for CASSANDRA-3047 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e7d1f83 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e7d1f83 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e7d1f83 Branch: refs/heads/trunk Commit: 2e7d1f83f53d0aa2ffc52af6d8531c0355318484 Parents: 41ec9fc Author: Jonathan Ellis <[email protected]> Authored: Fri Jul 6 15:15:45 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri Jul 6 15:15:45 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/StorageService.java | 139 +++++++++------ .../cassandra/service/StorageServiceMBean.java | 5 +- src/java/org/apache/cassandra/tools/NodeCmd.java | 130 +++++++++----- src/java/org/apache/cassandra/tools/NodeProbe.java | 4 +- 5 files changed, 174 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ecda2dc..6b3aba2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047) * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881) * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366) * split up rpc timeout by operation type (CASSANDRA-2819) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 74da07e..e196f7c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -25,17 +25,13 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; -import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.google.common.base.Supplier; +import com.google.common.base.Function; import com.google.common.collect.*; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.metrics.ClientRequestMetrics; import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -49,23 +45,21 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.*; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.DynamicEndpointSnitch; -import org.apache.cassandra.locator.IEndpointSnitch; -import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.net.*; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.metrics.ClientRequestMetrics; +import org.apache.cassandra.net.IAsyncResult; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ResponseVerbHandler; import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler; import org.apache.cassandra.streaming.*; import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NodeId; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.OutputHandler; -import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.*; /** * This abstraction contains the token/identifier of this node @@ -958,25 +952,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe return rangeToEndpointMap; } - private Map<InetAddress, Collection<Range<Token>>> constructEndpointToRangeMap(String keyspace) - { - Multimap<InetAddress, Range<Token>> endpointToRangeMap = Multimaps.newListMultimap(new HashMap<InetAddress, Collection<Range<Token>>>(), new Supplier<List<Range<Token>>>() - { - public List<Range<Token>> get() - { - return Lists.newArrayList(); - } - }); - - List<Range<Token>> ranges = getAllRanges(tokenMetadata.sortedTokens()); - for (Range<Token> range : ranges) - { - for (InetAddress endpoint : Table.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.left)) - endpointToRangeMap.put(endpoint, range); - } - return endpointToRangeMap.asMap(); - } - /* * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update @@ -2759,22 +2734,35 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe StorageProxy.truncateBlocking(keyspace, columnFamily); } - public Map<String, Float> getOwnership() + public boolean isDcAwareReplicationStrategy(String keyspace) { - List<Token> sortedTokens = new ArrayList<Token>(tokenMetadata.getTokenToEndpointMapForReading().keySet()); + return SimpleStrategy.class != Table.open(keyspace).getReplicationStrategy().getClass(); + } + + public Map<InetAddress, Float> getOwnership() + { + Map<Token, InetAddress> tokensToEndpoints = tokenMetadata.getTokenToEndpointMapForReading(); + List<Token> sortedTokens = new ArrayList<Token>(tokensToEndpoints.keySet()); Collections.sort(sortedTokens); - Map<Token, Float> token_map = getPartitioner().describeOwnership(sortedTokens); - Map<String, Float> string_map = new HashMap<String, Float>(); - for(Map.Entry<Token, Float> entry : token_map.entrySet()) - { - string_map.put(entry.getKey().toString(), entry.getValue()); - } - return string_map; + // describeOwnership returns tokens in an unspecified order, let's re-order them + Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens)); + Map<InetAddress, Float> stringMap = new LinkedHashMap<InetAddress, Float>(); + for (Map.Entry<Token, Float> entry : tokenMap.entrySet()) + stringMap.put(tokensToEndpoints.get(entry.getKey()), entry.getValue()); + return stringMap; } - public Map<String, Float> effectiveOwnership(String keyspace) throws ConfigurationException + /** + * Calculates ownership. If there are multiple DC's and the replication strategy is DC aware then ownership will be + * calculated per dc, i.e. each DC will have total ring ownership divided amongst its nodes. Without replication + * total ownership will be a multiple of the number of DC's and this value will then go up within each DC depending + * on the number of replicas within itself. For DC unaware replication strategies, ownership without replication + * will be 100%. + * + * @throws ConfigurationException + */ + public LinkedHashMap<InetAddress, Float> effectiveOwnership(String keyspace) throws ConfigurationException { - Map<String, Float> effective = Maps.newHashMap(); if (Schema.instance.getNonSystemTables().size() <= 0) throw new ConfigurationException("Couldn't find any Non System Keyspaces to infer replication topology"); if (keyspace == null && !hasSameReplication(Schema.instance.getNonSystemTables())) @@ -2783,22 +2771,65 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe if (keyspace == null) keyspace = Schema.instance.getNonSystemTables().get(0); - List<Token> sortedTokens = new ArrayList<Token>(tokenMetadata.getTokenToEndpointMapForReading().keySet()); - Collections.sort(sortedTokens); - Map<Token, Float> ownership = getPartitioner().describeOwnership(sortedTokens); + final BiMap<InetAddress, Token> endpointsToTokens = ImmutableBiMap.copyOf(tokenMetadata.getTokenToEndpointMapForReading()).inverse(); - for (Entry<InetAddress, Collection<Range<Token>>> ranges : constructEndpointToRangeMap(keyspace).entrySet()) + Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>(); + if (isDcAwareReplicationStrategy(keyspace)) + { + // mapping of dc's to nodes, use sorted map so that we get dcs sorted + SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>(); + sortedDcsToEndpoints.putAll(tokenMetadata.getTopology().getDatacenterEndpoints().asMap()); + for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values()) + endpointsGroupedByDc.add(endpoints); + } + else { - Token token = tokenMetadata.getToken(ranges.getKey()); - for (Range<Token> range: ranges.getValue()) + endpointsGroupedByDc.add(endpointsToTokens.keySet()); + } + + LinkedHashMap<InetAddress, Float> finalOwnership = Maps.newLinkedHashMap(); + + // calculate ownership per dc + for (Collection<InetAddress> endpoints : endpointsGroupedByDc) + { + // sort the endpoints by their tokens + List<InetAddress> sortedEndpoints = Lists.newArrayListWithExpectedSize(endpoints.size()); + sortedEndpoints.addAll(endpoints); + + Collections.sort(sortedEndpoints, new Comparator<InetAddress>() { - float value = effective.get(token.toString()) == null ? 0.0F : effective.get(token.toString()); - effective.put(token.toString(), value + ownership.get(range.left)); + public int compare(InetAddress o1, InetAddress o2) + { + return endpointsToTokens.get(o1).compareTo(endpointsToTokens.get(o2)); + } + }); + + // calculate the ownership without replication + Function<InetAddress, Token> f = new Function<InetAddress, Token>() + { + public Token apply(InetAddress arg0) + { + return endpointsToTokens.get(arg0); + } + }; + Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(Lists.transform(sortedEndpoints, f)); + + // calculate the ownership with replication and add the endpoint to the final ownership map + for (InetAddress endpoint : endpoints) + { + float ownership = 0.0f; + for (Range<Token> range : getRangesForEndpoint(keyspace, endpoint)) + { + if (tokenOwnership.containsKey(range.left)) + ownership += tokenOwnership.get(range.left); + } + finalOwnership.put(endpoint, ownership); } } - return effective; + return finalOwnership; } + private boolean hasSameReplication(List<String> list) { if (list.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index ff1ec03..80ba05f 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -328,7 +329,7 @@ public interface StorageServiceMBean * given a list of tokens (representing the nodes in the cluster), returns * a mapping from "token -> %age of cluster owned by that token" */ - public Map<String, Float> getOwnership(); + public Map<InetAddress, Float> getOwnership(); /** * Effective ownership is % of the data each node owns given the keyspace @@ -337,7 +338,7 @@ public interface StorageServiceMBean * in the cluster have the same replication strategies and if yes then we will * use the first else a empty Map is returned. */ - public Map<String, Float> effectiveOwnership(String keyspace) throws ConfigurationException; + public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws ConfigurationException; public List<String> getKeyspaces(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 315e9c3..808494e 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -28,8 +28,8 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; -import org.apache.cassandra.service.CacheServiceMBean; -import org.apache.cassandra.service.StorageProxyMBean; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.Maps; import org.apache.commons.cli.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; @@ -38,6 +38,8 @@ import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.compaction.CompactionManagerMBean; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.net.MessagingServiceMBean; +import org.apache.cassandra.service.CacheServiceMBean; +import org.apache.cassandra.service.StorageProxyMBean; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.Pair; @@ -208,14 +210,53 @@ public class NodeCmd /** * Write a textual representation of the Cassandra ring. - * - * @param outs the stream to write to + * + * @param outs + * the stream to write to */ public void printRing(PrintStream outs, String keyspace) { - Map<String, String> tokenToEndpoint = probe.getTokenToEndpointMap(); - List<String> sortedTokens = new ArrayList<String>(tokenToEndpoint.keySet()); + Map<String, String> endpointsToTokens = ImmutableBiMap.copyOf(probe.getTokenToEndpointMap()).inverse(); + String format = "%-16s%-12s%-7s%-8s%-16s%-20s%-44s%n"; + // Calculate per-token ownership of the ring + Map<InetAddress, Float> ownerships; + boolean keyspaceSelected; + try + { + ownerships = probe.effectiveOwnership(keyspace); + keyspaceSelected = true; + } + catch (ConfigurationException ex) + { + ownerships = probe.getOwnership(); + outs.printf("Note: Ownership information does not include topology; for complete information, specify a keyspace%n"); + keyspaceSelected = false; + } + try + { + outs.println(); + Map<String, Map<InetAddress, Float>> perDcOwnerships = Maps.newLinkedHashMap(); + // get the different datasets and map to tokens + for (Map.Entry<InetAddress, Float> ownership : ownerships.entrySet()) + { + String dc = probe.getEndpointSnitchInfoProxy().getDatacenter(ownership.getKey().getHostAddress()); + if (!perDcOwnerships.containsKey(dc)) + perDcOwnerships.put(dc, new LinkedHashMap<InetAddress, Float>()); + perDcOwnerships.get(dc).put(ownership.getKey(), ownership.getValue()); + } + for (Map.Entry<String, Map<InetAddress, Float>> entry : perDcOwnerships.entrySet()) + printDc(outs, format, entry.getKey(), endpointsToTokens, keyspaceSelected, entry.getValue()); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + private void printDc(PrintStream outs, String format, String dc, Map<String, String> endpointsToTokens, + boolean keyspaceSelected, Map<InetAddress, Float> filteredOwnerships) + { Collection<String> liveNodes = probe.getLiveNodes(); Collection<String> deadNodes = probe.getUnreachableNodes(); Collection<String> joiningNodes = probe.getJoiningNodes(); @@ -223,69 +264,64 @@ public class NodeCmd Collection<String> movingNodes = probe.getMovingNodes(); Map<String, String> loadMap = probe.getLoadMap(); - String format = "%-16s%-12s%-12s%-7s%-8s%-16s%-20s%-44s%n"; + outs.println("Datacenter: " + dc); + outs.println("=========="); - // Calculate per-token ownership of the ring - Map<String, Float> ownerships; - try - { - ownerships = probe.effectiveOwnership(keyspace); - outs.printf(format, "Address", "DC", "Rack", "Status", "State", "Load", "Effective-Ownership", "Token"); - } - catch (ConfigurationException ex) + // get the total amount of replicas for this dc and the last token in this dc's ring + float totalReplicas = 0f; + String lastToken = ""; + for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet()) { - ownerships = probe.getOwnership(); - outs.printf("Note: Ownership information does not include topology, please specify a keyspace. %n"); - outs.printf(format, "Address", "DC", "Rack", "Status", "State", "Load", "Owns", "Token"); + lastToken = endpointsToTokens.get(entry.getKey().getHostAddress()); + totalReplicas += entry.getValue(); } + + + if (keyspaceSelected) + outs.print("Replicas: " + (int) totalReplicas + "\n\n"); - // show pre-wrap token twice so you can always read a node's range as - // (previous line token, current line token] - if (sortedTokens.size() > 1) - outs.printf(format, "", "", "", "", "", "", "", sortedTokens.get(sortedTokens.size() - 1)); + outs.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token"); - for (String token : sortedTokens) + if (filteredOwnerships.size() > 1) + outs.printf(format, "", "", "", "", "", "", lastToken); + else + outs.println(); + + for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet()) { - String primaryEndpoint = tokenToEndpoint.get(token); - String dataCenter; - try - { - dataCenter = probe.getEndpointSnitchInfoProxy().getDatacenter(primaryEndpoint); - } - catch (UnknownHostException e) - { - dataCenter = "Unknown"; - } + String endpoint = entry.getKey().getHostAddress(); + String token = endpointsToTokens.get(entry.getKey().getHostAddress()); String rack; try { - rack = probe.getEndpointSnitchInfoProxy().getRack(primaryEndpoint); + rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint); } catch (UnknownHostException e) { rack = "Unknown"; } - String status = liveNodes.contains(primaryEndpoint) - ? "Up" - : deadNodes.contains(primaryEndpoint) - ? "Down" - : "?"; + String status = liveNodes.contains(endpoint) + ? "Up" + : deadNodes.contains(endpoint) + ? "Down" + : "?"; String state = "Normal"; - if (joiningNodes.contains(primaryEndpoint)) + if (joiningNodes.contains(endpoint)) state = "Joining"; - else if (leavingNodes.contains(primaryEndpoint)) + else if (leavingNodes.contains(endpoint)) state = "Leaving"; - else if (movingNodes.contains(primaryEndpoint)) + else if (movingNodes.contains(endpoint)) state = "Moving"; - String load = loadMap.containsKey(primaryEndpoint) - ? loadMap.get(primaryEndpoint) - : "?"; - String owns = new DecimalFormat("##0.00%").format(ownerships.get(token) == null ? 0.0F : ownerships.get(token)); - outs.printf(format, primaryEndpoint, dataCenter, rack, status, state, load, owns, token); + String load = loadMap.containsKey(endpoint) + ? loadMap.get(endpoint) + : "?"; + String owns = new DecimalFormat("##0.00%").format(entry.getValue()); + outs.printf(format, entry.getKey(), rack, status, state, load, owns, token); } + outs.println(); } /** Writes a table of host IDs to a PrintStream */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index f6de85b..8ad8b92 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -263,12 +263,12 @@ public class NodeProbe return ssProxy.getLoadMap(); } - public Map<String, Float> getOwnership() + public Map<InetAddress, Float> getOwnership() { return ssProxy.getOwnership(); } - public Map<String, Float> effectiveOwnership(String keyspace) throws ConfigurationException + public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws ConfigurationException { return ssProxy.effectiveOwnership(keyspace); }
