Author: jbellis
Date: Sun Feb 7 02:21:32 2010
New Revision: 907369
URL: http://svn.apache.org/viewvc?rev=907369&view=rev
Log:
iterate ranges rather than endpoints, and sort endpoints by proximity
patch by jbellis; reviewed by Stu Hood for CASSANDRA-771
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=907369&r1=907368&r2=907369&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
Sun Feb 7 02:21:32 2010
@@ -155,10 +155,10 @@
while (forloopReturn.size() < replicas_ && iter.hasNext())
{
Token t = iter.next();
- InetAddress endPointOfIntrest = metadata.getEndPoint(t);
+ InetAddress endPointOfInterest = metadata.getEndPoint(t);
if (forloopReturn.size() < replicas_ - 1)
{
- forloopReturn.add(endPointOfIntrest);
+ forloopReturn.add(endPointOfInterest);
continue;
}
else
@@ -169,7 +169,7 @@
// Now try to find one on a different rack
if (!bOtherRack)
{
- if
(!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost,
endPointOfIntrest))
+ if
(!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost,
endPointOfInterest))
{
forloopReturn.add(metadata.getEndPoint(t));
bOtherRack = true;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=907369&r1=907368&r2=907369&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Sun Feb 7 02:21:32 2010
@@ -535,39 +535,47 @@
{
long startTime = System.nanoTime();
TokenMetadata tokenMetadata =
StorageService.instance.getTokenMetadata();
+ Iterator<Token> iter =
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), command.range.left);
- InetAddress endPoint =
StorageService.instance.getPrimary(command.range.left);
- InetAddress startEndpoint = endPoint;
final String table = command.keyspace;
int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table),
DatabaseDescriptor.getReplicationFactor(table), consistency_level);
- // starting with the node that is primary for the start key, scan
until either we have enough results,
+ // starting with the range containing the start key, scan until either
we have enough results,
// or the node scan reports that it was done (i.e., encountered a key
outside the desired range).
Map<String, ColumnFamily> rows = new HashMap<String,
ColumnFamily>(command.max_keys);
outer:
- do
+ while (iter.hasNext())
{
- Range primaryRange =
StorageService.instance.getPrimaryRangeForEndPoint(endPoint);
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace,
primaryRange.right);
+ Token currentToken = iter.next();
+ Range currentRange = new
Range(tokenMetadata.getPredecessor(currentToken), currentToken);
+ List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, currentToken);
if (endpoints.size() < responseCount)
throw new UnavailableException();
+
DatabaseDescriptor.getEndPointSnitch(command.keyspace).sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
+
+ // make sure we only get keys from the current range (and not
other replicas that might be on the nodes).
+ // usually this will be only one range, but sometimes the
intersection of a wrapping Range with a non-wrapping
+ // is two disjoint, non-wrapping Ranges separated by a gap.
+ List<AbstractBounds> restricted =
command.range.restrictTo(currentRange);
- // to make comparing the results from each node easy, we restrict
each scan the primary range for the node in question
- List<AbstractBounds> restricted =
command.range.restrictTo(primaryRange);
for (AbstractBounds range : restricted)
{
RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace,
command.column_family, command.super_column, command.predicate, range,
command.max_keys);
Message message = c2.getMessage();
// collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
+ RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, currentRange, endpoints);
QuorumResponseHandler<Map<String, ColumnFamily>> handler = new
QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
- if (logger.isDebugEnabled())
- logger.debug("reading " + c2 + " for " + range + " from "
+ message.getMessageId() + "@" + endPoint);
- for (InetAddress replicaEndpoint : endpoints)
+
+ Iterator<InetAddress> endpointIter = endpoints.iterator();
+ for (int i = 0; i < responseCount; i++)
{
- MessagingService.instance.sendRR(message, replicaEndpoint,
handler);
+ InetAddress endpoint = endpointIter.next();
+ MessagingService.instance.sendRR(message, endpoint,
handler);
+ if (logger.isDebugEnabled())
+ logger.debug("reading " + c2 + " for " + range + "
from " + message.getMessageId() + "@" + endpoint);
}
+ // TODO read repair on remaining replicas?
// if we're done, great, otherwise, move to the next range
try
@@ -581,10 +589,7 @@
if (rows.size() >= command.max_keys || resolver.completed())
break outer;
}
-
- endPoint = tokenMetadata.getSuccessor(endPoint);
}
- while (!endPoint.equals(startEndpoint));
List<Pair<String, ColumnFamily>> results = new ArrayList<Pair<String,
ColumnFamily>>(rows.size());
for (Map.Entry<String, ColumnFamily> entry : rows.entrySet())
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=907369&r1=907368&r2=907369&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sun Feb 7 02:21:32 2010
@@ -1095,40 +1095,6 @@
}
/**
- * This method returns the endpoint that is responsible for storing the
- * specified key.
- *
- * @param key - key for which we need to find the endpoint
- * @return value - the endpoint responsible for this key
- */
- public InetAddress getPrimary(String key)
- {
- return getPrimary(partitioner_.getToken(key));
- }
-
- public InetAddress getPrimary(Token token)
- {
- List tokens = tokenMetadata_.sortedTokens();
- if (tokens.size() > 0)
- {
- return
tokenMetadata_.getEndPoint(TokenMetadata.ringIterator(tokens, token).next());
- }
- return FBUtilities.getLocalAddress();
- }
-
- /**
- * This method determines whether the local endpoint is the
- * primary for the given key.
- * @param key
- * @return true if the local endpoint is the primary replica.
- */
- public boolean isPrimary(String key)
- {
- InetAddress endpoint = getPrimary(key);
- return FBUtilities.getLocalAddress().equals(endpoint);
- }
-
- /**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
*