Author: jbellis
Date: Fri Jun 25 22:11:54 2010
New Revision: 958133

URL: http://svn.apache.org/viewvc?rev=958133&view=rev
Log:
replace sorting of unwrapped range in token order, fixing a regression 
introduced in r948934.  patch by jbellis; reviewed by eevans for CASSANDRA-1198

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=958133&r1=958132&r2=958133&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Jun 25 22:11:54 2010
@@ -1,6 +1,5 @@
 0.6.3
  * retry to make streaming connections up to 8 times. (CASSANDRA-1019)
- * fix potential for duplicate rows seen by Hadoop jobs (CASSANDRA-1042)
  * reject describe_ring() calls on invalid keyspaces (CASSANDRA-1111)
  * fix cache size calculation for size of 100% (CASSANDRA-1129)
  * fix cache capacity only being recalculated once (CASSANDRA-1129)

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=958133&r1=958132&r2=958133&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 Fri Jun 25 22:11:54 2010
@@ -557,14 +557,18 @@ public class StorageProxy implements Sto
         final String table = command.keyspace;
         int responseCount = 
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), 
consistency_level);
 
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = 
getRestrictedRanges(command.range, command.keyspace, responseCount);
+        List<AbstractBounds> ranges = getRestrictedRanges(command.range, 
command.keyspace, responseCount);
 
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(command.max_keys);
-        for (Pair<AbstractBounds, List<InetAddress>> pair : 
getRangeIterator(ranges, command.range.left))
+        for (AbstractBounds range : getRangeIterator(ranges, 
command.range.left))
         {
-            AbstractBounds range = pair.left;
-            List<InetAddress> endpoints = pair.right;
+            List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+            if (liveEndpoints.size() < responseCount)
+                throw new UnavailableException();
+            
DatabaseDescriptor.getEndPointSnitch(command.keyspace).sortByProximity(FBUtilities.getLocalAddress(),
 liveEndpoints);
+            List<InetAddress> endpoints = liveEndpoints.subList(0, 
responseCount);
+
             RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, 
command.column_family, command.super_column, command.predicate, range, 
command.max_keys);
             Message message = c2.getMessage();
 
@@ -607,30 +611,30 @@ public class StorageProxy implements Sto
     /**
      * returns an iterator that will return ranges in ring order, starting 
with the one that contains the start token
      */
-    private static Iterable<Pair<AbstractBounds, List<InetAddress>>> 
getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges, 
Token start)
+    private static Iterable<AbstractBounds> getRangeIterator(final 
List<AbstractBounds> ranges, Token start)
     {
         // find the one to start with
         int i;
         for (i = 0; i < ranges.size(); i++)
         {
-            AbstractBounds range = ranges.get(i).left;
+            AbstractBounds range = ranges.get(i);
             if (range.contains(start) || range.left.equals(start))
                 break;
         }
-        AbstractBounds range = ranges.get(i).left;
+        AbstractBounds range = ranges.get(i);
         assert range.contains(start) || range.left.equals(start); // make sure 
the loop didn't just end b/c ranges were exhausted
 
         // return an iterable that starts w/ the correct range and iterates 
the rest in ring order
         final int begin = i;
-        return new Iterable<Pair<AbstractBounds, List<InetAddress>>>()
+        return new Iterable<AbstractBounds>()
         {
-            public Iterator<Pair<AbstractBounds, List<InetAddress>>> iterator()
+            public Iterator<AbstractBounds> iterator()
             {
-                return new AbstractIterator<Pair<AbstractBounds, 
List<InetAddress>>>()
+                return new AbstractIterator<AbstractBounds>()
                 {
                     int n = 0;
 
-                    protected Pair<AbstractBounds, List<InetAddress>> 
computeNext()
+                    protected AbstractBounds computeNext()
                     {
                         if (n == ranges.size())
                             return endOfData();
@@ -655,33 +659,39 @@ public class StorageProxy implements Sto
      *     D, but we don't want any other results from it until after the (D, 
T] range.  Unwrapping so that
      *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
      */
-    private static List<Pair<AbstractBounds, List<InetAddress>>> 
getRestrictedRanges(AbstractBounds queryRange, String keyspace, int 
responseCount)
+    private static List<AbstractBounds> getRestrictedRanges(AbstractBounds 
queryRange, String keyspace, int responseCount)
     throws UnavailableException
     {
         TokenMetadata tokenMetadata = 
StorageService.instance.getTokenMetadata();
-        Iterator<Token> iter = 
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left);
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = new 
ArrayList<Pair<AbstractBounds, List<InetAddress>>>();
-        while (iter.hasNext())
+
+        List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
+        // for each node, compute its intersection with the query range, and 
add its unwrapped components to our list
+        for (Token nodeToken : tokenMetadata.sortedTokens())
         {
-            Token nodeToken = iter.next();
             Range nodeRange = new 
Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
-            List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
-            if (endpoints.size() < responseCount)
-                throw new UnavailableException();
-
-            
DatabaseDescriptor.getEndPointSnitch(keyspace).sortByProximity(FBUtilities.getLocalAddress(),
 endpoints);
-            List<InetAddress> endpointsForCL = endpoints.subList(0, 
responseCount);
-            Set<AbstractBounds> restrictedRanges = 
queryRange.restrictTo(nodeRange);
-            for (AbstractBounds range : restrictedRanges)
+            for (AbstractBounds range : queryRange.restrictTo(nodeRange))
             {
                 for (AbstractBounds unwrapped : range.unwrap())
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Adding to restricted ranges " + 
unwrapped + " for " + nodeRange);
-                    ranges.add(new Pair<AbstractBounds, 
List<InetAddress>>(unwrapped, endpointsForCL));
+                    ranges.add(unwrapped);
                 }
             }
         }
+
+        // re-sort ranges in ring order, post-unwrapping
+        Comparator<AbstractBounds> comparator = new 
Comparator<AbstractBounds>()
+        {
+            public int compare(AbstractBounds o1, AbstractBounds o2)
+            {
+                // no restricted ranges will overlap so we don't need to worry 
about inclusive vs exclusive left,
+                // just sort by raw token position.
+                return o1.left.compareTo(o2.left);
+            }
+        };
+        Collections.sort(ranges, comparator);
+
         return ranges;
     }
 


Reply via email to