Author: jbellis
Date: Sun Feb  7 02:21:32 2010
New Revision: 907369

URL: http://svn.apache.org/viewvc?rev=907369&view=rev
Log:
iterate ranges rather than endpoints, and sort endpoints by proximity
patch by jbellis; reviewed by Stu Hood for CASSANDRA-771

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=907369&r1=907368&r2=907369&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
 Sun Feb  7 02:21:32 2010
@@ -155,10 +155,10 @@
             while (forloopReturn.size() < replicas_ && iter.hasNext())
             {
                 Token t = iter.next();
-                InetAddress endPointOfIntrest = metadata.getEndPoint(t);
+                InetAddress endPointOfInterest = metadata.getEndPoint(t);
                 if (forloopReturn.size() < replicas_ - 1)
                 {
-                    forloopReturn.add(endPointOfIntrest);
+                    forloopReturn.add(endPointOfInterest);
                     continue;
                 }
                 else
@@ -169,7 +169,7 @@
                 // Now try to find one on a different rack
                 if (!bOtherRack)
                 {
-                    if 
(!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost, 
endPointOfIntrest))
+                    if 
(!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost, 
endPointOfInterest))
                     {
                         forloopReturn.add(metadata.getEndPoint(t));
                         bOtherRack = true;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=907369&r1=907368&r2=907369&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Sun Feb  7 02:21:32 2010
@@ -535,39 +535,47 @@
     {
         long startTime = System.nanoTime();
         TokenMetadata tokenMetadata = 
StorageService.instance.getTokenMetadata();
+        Iterator<Token> iter = 
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), command.range.left);
 
-        InetAddress endPoint = 
StorageService.instance.getPrimary(command.range.left);
-        InetAddress startEndpoint = endPoint;
         final String table = command.keyspace;
         int responseCount = 
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), 
DatabaseDescriptor.getReplicationFactor(table), consistency_level);
 
-        // starting with the node that is primary for the start key, scan 
until either we have enough results,
+        // starting with the range containing the start key, scan until either 
we have enough results,
         // or the node scan reports that it was done (i.e., encountered a key 
outside the desired range).
         Map<String, ColumnFamily> rows = new HashMap<String, 
ColumnFamily>(command.max_keys);
         outer:
-        do
+        while (iter.hasNext())
         {
-            Range primaryRange = 
StorageService.instance.getPrimaryRangeForEndPoint(endPoint);
-            List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, 
primaryRange.right);
+            Token currentToken = iter.next();
+            Range currentRange = new 
Range(tokenMetadata.getPredecessor(currentToken), currentToken);
+            List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, currentToken);
             if (endpoints.size() < responseCount)
                 throw new UnavailableException();
+            
DatabaseDescriptor.getEndPointSnitch(command.keyspace).sortByProximity(FBUtilities.getLocalAddress(),
 endpoints);
+
+            // make sure we only get keys from the current range (and not 
other replicas that might be on the nodes).
+            // usually this will be only one range, but sometimes the 
intersection of a wrapping Range with a non-wrapping
+            // is two disjoint, non-wrapping Ranges separated by a gap.
+            List<AbstractBounds> restricted = 
command.range.restrictTo(currentRange);
 
-            // to make comparing the results from each node easy, we restrict 
each scan the primary range for the node in question
-            List<AbstractBounds> restricted = 
command.range.restrictTo(primaryRange);
             for (AbstractBounds range : restricted)
             {
                 RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, 
command.column_family, command.super_column, command.predicate, range, 
command.max_keys);
                 Message message = c2.getMessage();
 
                 // collect replies and resolve according to consistency level
-                RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
+                RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, currentRange, endpoints);
                 QuorumResponseHandler<Map<String, ColumnFamily>> handler = new 
QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
-                if (logger.isDebugEnabled())
-                    logger.debug("reading " + c2 + " for " + range + " from " 
+ message.getMessageId() + "@" + endPoint);
-                for (InetAddress replicaEndpoint : endpoints)
+
+                Iterator<InetAddress> endpointIter = endpoints.iterator();
+                for (int i = 0; i < responseCount; i++)
                 {
-                    MessagingService.instance.sendRR(message, replicaEndpoint, 
handler);
+                    InetAddress endpoint = endpointIter.next();
+                    MessagingService.instance.sendRR(message, endpoint, 
handler);
+                    if (logger.isDebugEnabled())
+                        logger.debug("reading " + c2 + " for " + range + " 
from " + message.getMessageId() + "@" + endpoint);
                 }
+                // TODO read repair on remaining replicas?
 
                 // if we're done, great, otherwise, move to the next range
                 try
@@ -581,10 +589,7 @@
                 if (rows.size() >= command.max_keys || resolver.completed())
                     break outer;
             }
-
-            endPoint = tokenMetadata.getSuccessor(endPoint);
         }
-        while (!endPoint.equals(startEndpoint));
 
         List<Pair<String, ColumnFamily>> results = new ArrayList<Pair<String, 
ColumnFamily>>(rows.size());
         for (Map.Entry<String, ColumnFamily> entry : rows.entrySet())

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=907369&r1=907368&r2=907369&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Sun Feb  7 02:21:32 2010
@@ -1095,40 +1095,6 @@
     }
 
     /**
-     * This method returns the endpoint that is responsible for storing the
-     * specified key.
-     *
-     * @param key - key for which we need to find the endpoint
-     * @return value - the endpoint responsible for this key
-     */
-    public InetAddress getPrimary(String key)
-    {
-        return getPrimary(partitioner_.getToken(key));
-    }
-
-    public InetAddress getPrimary(Token token)
-    {
-        List tokens = tokenMetadata_.sortedTokens();
-        if (tokens.size() > 0)
-        {
-            return 
tokenMetadata_.getEndPoint(TokenMetadata.ringIterator(tokens, token).next());
-        }
-        return FBUtilities.getLocalAddress();
-    }
-
-    /**
-     * This method determines whether the local endpoint is the
-     * primary for the given key.
-     * @param key
-     * @return true if the local endpoint is the primary replica.
-    */
-    public boolean isPrimary(String key)
-    {
-        InetAddress endpoint = getPrimary(key);
-        return FBUtilities.getLocalAddress().equals(endpoint);
-    }
-
-    /**
      * This method returns the N endpoints that are responsible for storing the
      * specified key i.e for replication.
      *


Reply via email to