Updated Branches:
  refs/heads/cassandra-1.1 9a8a8902b -> aa2c28ead
  refs/heads/trunk 1440079d4 -> 2e7d1f83f


improve "nodetool ring" handling of multi-dc clusters
patch by dralves; reviewed by jbellis for CASSANDRA-3047


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e7d1f83
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e7d1f83
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e7d1f83

Branch: refs/heads/trunk
Commit: 2e7d1f83f53d0aa2ffc52af6d8531c0355318484
Parents: 41ec9fc
Author: Jonathan Ellis <[email protected]>
Authored: Fri Jul 6 15:15:45 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Fri Jul 6 15:15:45 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/service/StorageService.java   |  139 +++++++++------
 .../cassandra/service/StorageServiceMBean.java     |    5 +-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |  130 +++++++++-----
 src/java/org/apache/cassandra/tools/NodeProbe.java |    4 +-
 5 files changed, 174 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecda2dc..6b3aba2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047)
  * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
  * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
  * split up rpc timeout by operation type (CASSANDRA-2819)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 74da07e..e196f7c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -25,17 +25,13 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.base.Supplier;
+import com.google.common.base.Function;
 import com.google.common.collect.*;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.metrics.ClientRequestMetrics;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -49,23 +45,21 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.locator.*;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NodeId;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.*;
 
 /**
  * This abstraction contains the token/identifier of this node
@@ -958,25 +952,6 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         return rangeToEndpointMap;
     }
 
-    private Map<InetAddress, Collection<Range<Token>>> 
constructEndpointToRangeMap(String keyspace)
-    {
-        Multimap<InetAddress, Range<Token>> endpointToRangeMap = 
Multimaps.newListMultimap(new HashMap<InetAddress, Collection<Range<Token>>>(), 
new Supplier<List<Range<Token>>>()
-        {
-            public List<Range<Token>> get()
-            {
-                return Lists.newArrayList();
-            }
-        });
-
-        List<Range<Token>> ranges = getAllRanges(tokenMetadata.sortedTokens());
-        for (Range<Token> range : ranges)
-        {
-            for (InetAddress endpoint : 
Table.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.left))
-                endpointToRangeMap.put(endpoint, range);
-        }
-        return endpointToRangeMap.asMap();
-    }
-
     /*
      * Handle the reception of a new particular ApplicationState for a 
particular endpoint. Note that the value of the
      * ApplicationState has not necessarily "changed" since the last known 
value, if we already received the same update
@@ -2759,22 +2734,35 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         StorageProxy.truncateBlocking(keyspace, columnFamily);
     }
 
-    public Map<String, Float> getOwnership()
+    public boolean isDcAwareReplicationStrategy(String keyspace)
     {
-        List<Token> sortedTokens = new 
ArrayList<Token>(tokenMetadata.getTokenToEndpointMapForReading().keySet());
+        return SimpleStrategy.class != 
Table.open(keyspace).getReplicationStrategy().getClass();
+    }
+
+    public Map<InetAddress, Float> getOwnership()
+    {
+        Map<Token, InetAddress> tokensToEndpoints = 
tokenMetadata.getTokenToEndpointMapForReading();
+        List<Token> sortedTokens = new 
ArrayList<Token>(tokensToEndpoints.keySet());
         Collections.sort(sortedTokens);
-        Map<Token, Float> token_map = 
getPartitioner().describeOwnership(sortedTokens);
-        Map<String, Float> string_map = new HashMap<String, Float>();
-        for(Map.Entry<Token, Float> entry : token_map.entrySet())
-        {
-            string_map.put(entry.getKey().toString(), entry.getValue());
-        }
-        return string_map;
+        // describeOwnership returns tokens in an unspecified order, let's 
re-order them
+        Map<Token, Float> tokenMap = new TreeMap<Token, 
Float>(getPartitioner().describeOwnership(sortedTokens));
+        Map<InetAddress, Float> stringMap = new LinkedHashMap<InetAddress, 
Float>();
+        for (Map.Entry<Token, Float> entry : tokenMap.entrySet())
+            stringMap.put(tokensToEndpoints.get(entry.getKey()), 
entry.getValue());
+        return stringMap;
     }
 
-    public Map<String, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException
+    /**
+     * Calculates ownership. If there are multiple DC's and the replication 
strategy is DC aware then ownership will be
+     * calculated per dc, i.e. each DC will have total ring ownership divided 
amongst its nodes. Without replication
+     * total ownership will be a multiple of the number of DC's and this value 
will then go up within each DC depending
+     * on the number of replicas within itself. For DC unaware replication 
strategies, ownership without replication
+     * will be 100%.
+     * 
+     * @throws ConfigurationException
+     */
+    public LinkedHashMap<InetAddress, Float> effectiveOwnership(String 
keyspace) throws ConfigurationException
     {
-        Map<String, Float> effective = Maps.newHashMap();
         if (Schema.instance.getNonSystemTables().size() <= 0)
             throw new ConfigurationException("Couldn't find any Non System 
Keyspaces to infer replication topology");
         if (keyspace == null && 
!hasSameReplication(Schema.instance.getNonSystemTables()))
@@ -2783,22 +2771,65 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         if (keyspace == null)
             keyspace = Schema.instance.getNonSystemTables().get(0);
 
-        List<Token> sortedTokens = new 
ArrayList<Token>(tokenMetadata.getTokenToEndpointMapForReading().keySet());
-        Collections.sort(sortedTokens);
-        Map<Token, Float> ownership = 
getPartitioner().describeOwnership(sortedTokens);
+        final BiMap<InetAddress, Token> endpointsToTokens = 
ImmutableBiMap.copyOf(tokenMetadata.getTokenToEndpointMapForReading()).inverse();
 
-        for (Entry<InetAddress, Collection<Range<Token>>> ranges : 
constructEndpointToRangeMap(keyspace).entrySet())
+        Collection<Collection<InetAddress>> endpointsGroupedByDc = new 
ArrayList<Collection<InetAddress>>();
+        if (isDcAwareReplicationStrategy(keyspace))
+        {
+            // mapping of dc's to nodes, use sorted map so that we get dcs 
sorted
+            SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = 
new TreeMap<String, Collection<InetAddress>>();
+            
sortedDcsToEndpoints.putAll(tokenMetadata.getTopology().getDatacenterEndpoints().asMap());
+            for (Collection<InetAddress> endpoints : 
sortedDcsToEndpoints.values())
+                endpointsGroupedByDc.add(endpoints);
+        }
+        else
         {
-            Token token = tokenMetadata.getToken(ranges.getKey());
-            for (Range<Token> range: ranges.getValue())
+            endpointsGroupedByDc.add(endpointsToTokens.keySet());
+        }
+
+        LinkedHashMap<InetAddress, Float> finalOwnership = 
Maps.newLinkedHashMap();
+
+        // calculate ownership per dc
+        for (Collection<InetAddress> endpoints : endpointsGroupedByDc)
+        {
+            // sort the endpoints by their tokens
+            List<InetAddress> sortedEndpoints = 
Lists.newArrayListWithExpectedSize(endpoints.size());
+            sortedEndpoints.addAll(endpoints);
+
+            Collections.sort(sortedEndpoints, new Comparator<InetAddress>()
             {
-                float value = effective.get(token.toString()) == null ? 0.0F : 
effective.get(token.toString());
-                effective.put(token.toString(), value + 
ownership.get(range.left));
+                public int compare(InetAddress o1, InetAddress o2)
+                {
+                    return 
endpointsToTokens.get(o1).compareTo(endpointsToTokens.get(o2));
+                }
+            });
+
+            // calculate the ownership without replication
+            Function<InetAddress, Token> f = new Function<InetAddress, Token>()
+            {
+                public Token apply(InetAddress arg0)
+                {
+                    return endpointsToTokens.get(arg0);
+                }
+            };
+            Map<Token, Float> tokenOwnership = 
getPartitioner().describeOwnership(Lists.transform(sortedEndpoints, f));
+
+            // calculate the ownership with replication and add the endpoint 
to the final ownership map
+            for (InetAddress endpoint : endpoints)
+            {
+                float ownership = 0.0f;
+                for (Range<Token> range : getRangesForEndpoint(keyspace, 
endpoint))
+                {
+                    if (tokenOwnership.containsKey(range.left))
+                        ownership += tokenOwnership.get(range.left);
+                }
+                finalOwnership.put(endpoint, ownership);
             }
         }
-        return effective;
+        return finalOwnership;
     }
 
+
     private boolean hasSameReplication(List<String> list)
     {
         if (list.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ff1ec03..80ba05f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -328,7 +329,7 @@ public interface StorageServiceMBean
      * given a list of tokens (representing the nodes in the cluster), returns
      *   a mapping from "token -> %age of cluster owned by that token"
      */
-    public Map<String, Float> getOwnership();
+    public Map<InetAddress, Float> getOwnership();
 
     /**
      * Effective ownership is % of the data each node owns given the keyspace
@@ -337,7 +338,7 @@ public interface StorageServiceMBean
      * in the cluster have the same replication strategies and if yes then we 
will
      * use the first else a empty Map is returned.
      */
-    public Map<String, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException;
+    public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException;
 
     public List<String> getKeyspaces();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 315e9c3..808494e 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -28,8 +28,8 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.service.CacheServiceMBean;
-import org.apache.cassandra.service.StorageProxyMBean;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
@@ -38,6 +38,8 @@ import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.service.StorageProxyMBean;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.Pair;
@@ -208,14 +210,53 @@ public class NodeCmd
 
     /**
      * Write a textual representation of the Cassandra ring.
-     *
-     * @param outs the stream to write to
+     * 
+     * @param outs
+     *            the stream to write to
      */
     public void printRing(PrintStream outs, String keyspace)
     {
-        Map<String, String> tokenToEndpoint = probe.getTokenToEndpointMap();
-        List<String> sortedTokens = new 
ArrayList<String>(tokenToEndpoint.keySet());
+        Map<String, String> endpointsToTokens = 
ImmutableBiMap.copyOf(probe.getTokenToEndpointMap()).inverse();
+        String format = "%-16s%-12s%-7s%-8s%-16s%-20s%-44s%n";
 
+        // Calculate per-token ownership of the ring
+        Map<InetAddress, Float> ownerships;
+        boolean keyspaceSelected;
+        try
+        {
+            ownerships = probe.effectiveOwnership(keyspace);
+            keyspaceSelected = true;
+        }
+        catch (ConfigurationException ex)
+        {
+            ownerships = probe.getOwnership();
+            outs.printf("Note: Ownership information does not include 
topology; for complete information, specify a keyspace%n");
+            keyspaceSelected = false;
+        }
+        try
+        {
+            outs.println();
+            Map<String, Map<InetAddress, Float>> perDcOwnerships = 
Maps.newLinkedHashMap();
+            // get the different datasets and map to tokens
+            for (Map.Entry<InetAddress, Float> ownership : 
ownerships.entrySet())
+            {
+                String dc = 
probe.getEndpointSnitchInfoProxy().getDatacenter(ownership.getKey().getHostAddress());
+                if (!perDcOwnerships.containsKey(dc))
+                    perDcOwnerships.put(dc, new LinkedHashMap<InetAddress, 
Float>());
+                perDcOwnerships.get(dc).put(ownership.getKey(), 
ownership.getValue());
+            }
+            for (Map.Entry<String, Map<InetAddress, Float>> entry : 
perDcOwnerships.entrySet())
+                printDc(outs, format, entry.getKey(), endpointsToTokens, 
keyspaceSelected, entry.getValue());
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private void printDc(PrintStream outs, String format, String dc, 
Map<String, String> endpointsToTokens,
+            boolean keyspaceSelected, Map<InetAddress, Float> 
filteredOwnerships)
+    {
         Collection<String> liveNodes = probe.getLiveNodes();
         Collection<String> deadNodes = probe.getUnreachableNodes();
         Collection<String> joiningNodes = probe.getJoiningNodes();
@@ -223,69 +264,64 @@ public class NodeCmd
         Collection<String> movingNodes = probe.getMovingNodes();
         Map<String, String> loadMap = probe.getLoadMap();
 
-        String format = "%-16s%-12s%-12s%-7s%-8s%-16s%-20s%-44s%n";
+        outs.println("Datacenter: " + dc);
+        outs.println("==========");
 
-        // Calculate per-token ownership of the ring
-        Map<String, Float> ownerships;
-        try
-        {
-            ownerships = probe.effectiveOwnership(keyspace);
-            outs.printf(format, "Address", "DC", "Rack", "Status", "State", 
"Load", "Effective-Ownership", "Token");
-        }
-        catch (ConfigurationException ex)
+        // get the total amount of replicas for this dc and the last token in 
this dc's ring
+        float totalReplicas = 0f;
+        String lastToken = "";
+        for (Map.Entry<InetAddress, Float> entry : 
filteredOwnerships.entrySet())
         {
-            ownerships = probe.getOwnership();
-            outs.printf("Note: Ownership information does not include 
topology, please specify a keyspace. %n");
-            outs.printf(format, "Address", "DC", "Rack", "Status", "State", 
"Load", "Owns", "Token");
+            lastToken = endpointsToTokens.get(entry.getKey().getHostAddress());
+            totalReplicas += entry.getValue();
         }
+        
+
+        if (keyspaceSelected)
+            outs.print("Replicas: " + (int) totalReplicas + "\n\n");
 
-        // show pre-wrap token twice so you can always read a node's range as
-        // (previous line token, current line token]
-        if (sortedTokens.size() > 1)
-            outs.printf(format, "", "", "", "", "", "", "", 
sortedTokens.get(sortedTokens.size() - 1));
+        outs.printf(format, "Address", "Rack", "Status", "State", "Load", 
"Owns", "Token");
 
-        for (String token : sortedTokens)
+        if (filteredOwnerships.size() > 1)
+            outs.printf(format, "", "", "", "", "", "", lastToken);
+        else
+            outs.println();
+
+        for (Map.Entry<InetAddress, Float> entry : 
filteredOwnerships.entrySet())
         {
-            String primaryEndpoint = tokenToEndpoint.get(token);
-            String dataCenter;
-            try
-            {
-                dataCenter = 
probe.getEndpointSnitchInfoProxy().getDatacenter(primaryEndpoint);
-            }
-            catch (UnknownHostException e)
-            {
-                dataCenter = "Unknown";
-            }
+            String endpoint = entry.getKey().getHostAddress();
+            String token = 
endpointsToTokens.get(entry.getKey().getHostAddress());
             String rack;
             try
             {
-                rack = 
probe.getEndpointSnitchInfoProxy().getRack(primaryEndpoint);
+                rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
             }
             catch (UnknownHostException e)
             {
                 rack = "Unknown";
             }
-            String status = liveNodes.contains(primaryEndpoint)
-                            ? "Up"
-                            : deadNodes.contains(primaryEndpoint)
-                              ? "Down"
-                              : "?";
+            String status = liveNodes.contains(endpoint)
+                    ? "Up"
+                    : deadNodes.contains(endpoint)
+                            ? "Down"
+                            : "?";
 
             String state = "Normal";
 
-            if (joiningNodes.contains(primaryEndpoint))
+            if (joiningNodes.contains(endpoint))
                 state = "Joining";
-            else if (leavingNodes.contains(primaryEndpoint))
+            else if (leavingNodes.contains(endpoint))
                 state = "Leaving";
-            else if (movingNodes.contains(primaryEndpoint))
+            else if (movingNodes.contains(endpoint))
                 state = "Moving";
 
-            String load = loadMap.containsKey(primaryEndpoint)
-                          ? loadMap.get(primaryEndpoint)
-                          : "?";
-            String owns = new 
DecimalFormat("##0.00%").format(ownerships.get(token) == null ? 0.0F : 
ownerships.get(token));
-            outs.printf(format, primaryEndpoint, dataCenter, rack, status, 
state, load, owns, token);
+            String load = loadMap.containsKey(endpoint)
+                    ? loadMap.get(endpoint)
+                    : "?";
+            String owns = new 
DecimalFormat("##0.00%").format(entry.getValue());
+            outs.printf(format, entry.getKey(), rack, status, state, load, 
owns, token);
         }
+        outs.println();
     }
 
     /** Writes a table of host IDs to a PrintStream */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e7d1f83/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index f6de85b..8ad8b92 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -263,12 +263,12 @@ public class NodeProbe
         return ssProxy.getLoadMap();
     }
 
-    public Map<String, Float> getOwnership()
+    public Map<InetAddress, Float> getOwnership()
     {
         return ssProxy.getOwnership();
     }
 
-    public Map<String, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException
+    public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException
     {
         return ssProxy.effectiveOwnership(keyspace);
     }

Reply via email to