Author: jbellis
Date: Fri Jun 25 22:11:54 2010
New Revision: 958133
URL: http://svn.apache.org/viewvc?rev=958133&view=rev
Log:
replace sorting of unwrapped range in token order, fixing a regression
introduced in r948934. patch by jbellis; reviewed by eevans for CASSANDRA-1198
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=958133&r1=958132&r2=958133&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Jun 25 22:11:54 2010
@@ -1,6 +1,5 @@
0.6.3
* retry to make streaming connections up to 8 times. (CASSANDRA-1019)
- * fix potential for duplicate rows seen by Hadoop jobs (CASSANDRA-1042)
* reject describe_ring() calls on invalid keyspaces (CASSANDRA-1111)
* fix cache size calculation for size of 100% (CASSANDRA-1129)
* fix cache capacity only being recalculated once (CASSANDRA-1129)
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=958133&r1=958132&r2=958133&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Jun 25 22:11:54 2010
@@ -557,14 +557,18 @@ public class StorageProxy implements Sto
final String table = command.keyspace;
int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table),
consistency_level);
- List<Pair<AbstractBounds, List<InetAddress>>> ranges =
getRestrictedRanges(command.range, command.keyspace, responseCount);
+ List<AbstractBounds> ranges = getRestrictedRanges(command.range,
command.keyspace, responseCount);
// now scan until we have enough results
List<Row> rows = new ArrayList<Row>(command.max_keys);
- for (Pair<AbstractBounds, List<InetAddress>> pair :
getRangeIterator(ranges, command.range.left))
+ for (AbstractBounds range : getRangeIterator(ranges,
command.range.left))
{
- AbstractBounds range = pair.left;
- List<InetAddress> endpoints = pair.right;
+ List<InetAddress> liveEndpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+ if (liveEndpoints.size() < responseCount)
+ throw new UnavailableException();
+
DatabaseDescriptor.getEndPointSnitch(command.keyspace).sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
+ List<InetAddress> endpoints = liveEndpoints.subList(0,
responseCount);
+
RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace,
command.column_family, command.super_column, command.predicate, range,
command.max_keys);
Message message = c2.getMessage();
@@ -607,30 +611,30 @@ public class StorageProxy implements Sto
/**
* returns an iterator that will return ranges in ring order, starting
with the one that contains the start token
*/
- private static Iterable<Pair<AbstractBounds, List<InetAddress>>>
getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges,
Token start)
+ private static Iterable<AbstractBounds> getRangeIterator(final
List<AbstractBounds> ranges, Token start)
{
// find the one to start with
int i;
for (i = 0; i < ranges.size(); i++)
{
- AbstractBounds range = ranges.get(i).left;
+ AbstractBounds range = ranges.get(i);
if (range.contains(start) || range.left.equals(start))
break;
}
- AbstractBounds range = ranges.get(i).left;
+ AbstractBounds range = ranges.get(i);
assert range.contains(start) || range.left.equals(start); // make sure
the loop didn't just end b/c ranges were exhausted
// return an iterable that starts w/ the correct range and iterates
the rest in ring order
final int begin = i;
- return new Iterable<Pair<AbstractBounds, List<InetAddress>>>()
+ return new Iterable<AbstractBounds>()
{
- public Iterator<Pair<AbstractBounds, List<InetAddress>>> iterator()
+ public Iterator<AbstractBounds> iterator()
{
- return new AbstractIterator<Pair<AbstractBounds,
List<InetAddress>>>()
+ return new AbstractIterator<AbstractBounds>()
{
int n = 0;
- protected Pair<AbstractBounds, List<InetAddress>>
computeNext()
+ protected AbstractBounds computeNext()
{
if (n == ranges.size())
return endOfData();
@@ -655,33 +659,39 @@ public class StorageProxy implements Sto
* D, but we don't want any other results from it until after the (D,
T] range. Unwrapping so that
* the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
*/
- private static List<Pair<AbstractBounds, List<InetAddress>>>
getRestrictedRanges(AbstractBounds queryRange, String keyspace, int
responseCount)
+ private static List<AbstractBounds> getRestrictedRanges(AbstractBounds
queryRange, String keyspace, int responseCount)
throws UnavailableException
{
TokenMetadata tokenMetadata =
StorageService.instance.getTokenMetadata();
- Iterator<Token> iter =
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left);
- List<Pair<AbstractBounds, List<InetAddress>>> ranges = new
ArrayList<Pair<AbstractBounds, List<InetAddress>>>();
- while (iter.hasNext())
+
+ List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
+ // for each node, compute its intersection with the query range, and
add its unwrapped components to our list
+ for (Token nodeToken : tokenMetadata.sortedTokens())
{
- Token nodeToken = iter.next();
Range nodeRange = new
Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
- if (endpoints.size() < responseCount)
- throw new UnavailableException();
-
-
DatabaseDescriptor.getEndPointSnitch(keyspace).sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
- List<InetAddress> endpointsForCL = endpoints.subList(0,
responseCount);
- Set<AbstractBounds> restrictedRanges =
queryRange.restrictTo(nodeRange);
- for (AbstractBounds range : restrictedRanges)
+ for (AbstractBounds range : queryRange.restrictTo(nodeRange))
{
for (AbstractBounds unwrapped : range.unwrap())
{
if (logger.isDebugEnabled())
logger.debug("Adding to restricted ranges " +
unwrapped + " for " + nodeRange);
- ranges.add(new Pair<AbstractBounds,
List<InetAddress>>(unwrapped, endpointsForCL));
+ ranges.add(unwrapped);
}
}
}
+
+ // re-sort ranges in ring order, post-unwrapping
+ Comparator<AbstractBounds> comparator = new
Comparator<AbstractBounds>()
+ {
+ public int compare(AbstractBounds o1, AbstractBounds o2)
+ {
+ // no restricted ranges will overlap so we don't need to worry
about inclusive vs exclusive left,
+ // just sort by raw token position.
+ return o1.left.compareTo(o2.left);
+ }
+ };
+ Collections.sort(ranges, comparator);
+
return ranges;
}