Author: jbellis
Date: Wed Oct 21 19:18:43 2009
New Revision: 828148
URL: http://svn.apache.org/viewvc?rev=828148&view=rev
Log:
rename "readstorage" endpoints to "natural," "writestorage" to "write" and
"hintedstorage" to "hinted"
make ReplicationStrategy deal with List<InetAddress> instead of InetAddress[]
-- we use the former both in RS and externally, so forcing RS to convert to []
and external users to convert back to List is silly
patch by jbellis
Modified:
incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Modified: incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
(original)
+++ incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Wed
Oct 21 19:18:43 2009
@@ -171,7 +171,7 @@
/* Get serialized message to send to cluster */
message = createMessage(Keyspace, key.toString(), CFName,
columnFamilies);
- for (InetAddress endpoint:
StorageService.instance().getReadStorageEndPoints(key.toString()))
+ for (InetAddress endpoint:
StorageService.instance().getNaturalEndPoints(key.toString()))
{
/* Send message to end point */
MessagingService.instance().sendOneWay(message, endpoint);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
Wed Oct 21 19:18:43 2009
@@ -17,10 +17,8 @@
*/
package org.apache.cassandra.client;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
@@ -112,8 +110,8 @@
}
}
- public InetAddress[] getEndPoint(String key)
+ public List<InetAddress> getEndPoint(String key)
{
- return nodePicker_.getReadStorageEndPoints(partitioner_.getToken(key));
+ return nodePicker_.getNaturalEndpoints(partitioner_.getToken(key));
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Wed Oct 21 19:18:43 2009
@@ -113,7 +113,7 @@
RowMutation rm = new RowMutation(tableName, row);
Message message = rm.makeRowMutationMessage();
QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
- MessagingService.instance().sendRR(message, new InetAddress[]{
endPoint }, quorumResponseHandler);
+ MessagingService.instance().sendRR(message, new InetAddress[] {
endPoint }, quorumResponseHandler);
return quorumResponseHandler.get();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Wed Oct 21 19:18:43 2009
@@ -116,7 +116,7 @@
private void doReadRepair(Row row, ReadCommand readCommand)
{
- List<InetAddress> endpoints =
StorageService.instance().getLiveReadStorageEndPoints(readCommand.key);
+ List<InetAddress> endpoints =
StorageService.instance().getLiveNaturalEndpoints(readCommand.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(FBUtilities.getLocalAddress());
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Wed Oct 21 19:18:43 2009
@@ -36,6 +36,7 @@
import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
+ import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.*;
@@ -77,14 +78,14 @@
private static final ExecutorService bootstrapExecutor_ = new
DebuggableThreadPoolExecutor("BOOT-STRAPPER");
/* endpoints that need to be bootstrapped */
- protected InetAddress[] targets_ = new InetAddress[0];
+ protected List<InetAddress> targets_;
/* tokens of the nodes being bootstrapped. */
protected final Token[] tokens_;
protected TokenMetadata tokenMetadata_ = null;
- public BootStrapper(InetAddress[] target, Token... token)
+ public BootStrapper(List<InetAddress> targets, Token... token)
{
- targets_ = target;
+ targets_ = targets;
tokens_ = token;
tokenMetadata_ = StorageService.instance().getTokenMetadata();
}
@@ -94,14 +95,14 @@
try
{
// Mark as not bootstrapping to calculate ranges correctly
- for (int i=0; i< targets_.length; i++)
+ for (int i=0; i< targets_.size(); i++)
{
- tokenMetadata_.update(tokens_[i], targets_[i], false);
+ tokenMetadata_.update(tokens_[i], targets_.get(i), false);
}
Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget =
getRangesWithSourceTarget();
if (logger_.isDebugEnabled())
- logger_.debug("Beginning bootstrap process for " +
Arrays.toString(targets_) + " ...");
+ logger_.debug("Beginning bootstrap process for [" +
StringUtils.join(targets_, ", ") + "] ...");
/* Send messages to respective folks to stream data over to the
new nodes being bootstrapped */
LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
@@ -215,7 +216,7 @@
}
}
- BootStrapper bs = new BootStrapper(new InetAddress[] {
FBUtilities.getLocalAddress() }, ss.getLocalToken());
+ BootStrapper bs = new
BootStrapper(Arrays.asList(FBUtilities.getLocalAddress()), ss.getLocalToken());
bootstrapExecutor_.submit(bs);
Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE,
new ApplicationState(""));
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
Wed Oct 21 19:18:43 2009
@@ -18,13 +18,7 @@
package org.apache.cassandra.dht;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
+ import java.util.*;
import org.apache.log4j.Logger;
@@ -45,13 +39,13 @@
private static Logger logger_ =
Logger.getLogger(LeaveJoinProtocolImpl.class);
/* endpoints that are to be moved. */
- protected InetAddress[] targets_ = new InetAddress[0];
+ protected List<InetAddress> targets_;
/* position where they need to be moved */
protected final Token[] tokens_;
/* token metadata information */
protected TokenMetadata tokenMetadata_ = null;
- public LeaveJoinProtocolImpl(InetAddress[] targets, Token[] tokens)
+ public LeaveJoinProtocolImpl(List<InetAddress> targets, Token[] tokens)
{
targets_ = targets;
tokens_ = tokens;
@@ -92,10 +86,10 @@
/* Re-calculate the new ranges after the new token positions are
added */
Range[] newRanges =
StorageService.instance().getAllRanges(oldTokens);
/* Remove the old locations from tokenToEndPointMap and add the
new locations they are moving to */
- for ( int i = 0; i < targets_.length; ++i )
+ for ( int i = 0; i < targets_.size(); ++i )
{
- tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i])
);
- tokenToEndPointMap.put(tokens_[i], targets_[i]);
+ tokenToEndPointMap.remove(
endpointToTokenMap.get(targets_.get(i)) );
+ tokenToEndPointMap.put(tokens_[i], targets_.get(i));
}
/* Calculate the list of nodes that handle the new ranges */
Map<Range, List<InetAddress>> newRangeToEndPointMap =
StorageService.instance().constructRangeToEndPointMap(newRanges,
tokenToEndPointMap);
@@ -286,7 +280,7 @@
ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"),
InetAddress.getByName("G"));
ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"),
InetAddress.getByName("H"));
- Runnable runnable = new LeaveJoinProtocolImpl( new
InetAddress[]{InetAddress.getByName("C"), InetAddress.getByName("D")}, new
Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+ Runnable runnable = new
LeaveJoinProtocolImpl(Arrays.asList(InetAddress.getByName("C"),
InetAddress.getByName("D")), new Token[]{new BigIntegerToken("22"), new
BigIntegerToken("23")});
runnable.run();
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Wed Oct 21 19:18:43 2009
@@ -52,11 +52,11 @@
storagePort_ = storagePort;
}
- public abstract InetAddress[] getReadStorageEndPoints(Token token,
Map<Token, InetAddress> tokenToEndPointMap);
+ public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token,
Map<Token, InetAddress> tokenToEndPointMap);
- public InetAddress[] getReadStorageEndPoints(Token token)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token)
{
- return getReadStorageEndPoints(token,
tokenMetadata_.cloneTokenEndPointMap());
+ return getNaturalEndpoints(token,
tokenMetadata_.cloneTokenEndPointMap());
}
/*
@@ -64,9 +64,9 @@
* on which the data is being placed and the value is the
* endpoint to which it should be forwarded.
*/
- public Map<InetAddress, InetAddress> getHintedStorageEndPoints(Token
token, InetAddress[] naturalEndpoints)
+ public Map<InetAddress, InetAddress> getHintedEndpoints(Token token,
Collection<InetAddress> naturalEndpoints)
{
- return getHintedMapForEndpoints(getWriteStorageEndPoints(token,
naturalEndpoints));
+ return getHintedMapForEndpoints(getWriteEndpoints(token,
naturalEndpoints));
}
/**
@@ -77,11 +77,12 @@
*
* Only ReplicationStrategy should care about this method (higher level
users should only ask for Hinted).
*/
- public InetAddress[] getWriteStorageEndPoints(Token token, InetAddress[]
naturalEndpoints)
+ public ArrayList<InetAddress> getWriteEndpoints(Token token,
Collection<InetAddress> naturalEndpoints)
{
Map<Token, InetAddress> tokenToEndPointMap =
tokenMetadata_.cloneTokenEndPointMap();
Map<Token, InetAddress> bootstrapTokensToEndpointMap =
tokenMetadata_.cloneBootstrapNodes();
- ArrayList<InetAddress> list = new
ArrayList<InetAddress>(Arrays.asList(getReadStorageEndPoints(token,
tokenToEndPointMap)));
+ ArrayList<InetAddress> endpoints = new
ArrayList<InetAddress>(naturalEndpoints);
+
for (Token t : bootstrapTokensToEndpointMap.keySet())
{
InetAddress ep = bootstrapTokensToEndpointMap.get(t);
@@ -92,7 +93,7 @@
{
if (r.contains(token))
{
- list.add(ep);
+ endpoints.add(ep);
break;
}
}
@@ -102,10 +103,11 @@
tokenToEndPointMap.remove(t);
}
}
- return list.toArray(new InetAddress[list.size()]);
+
+ return endpoints;
}
- private Map<InetAddress, InetAddress>
getHintedMapForEndpoints(InetAddress[] topN)
+ private Map<InetAddress, InetAddress>
getHintedMapForEndpoints(Iterable<InetAddress> topN)
{
Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
Map<InetAddress, InetAddress> map = new HashMap<InetAddress,
InetAddress>();
@@ -168,7 +170,7 @@
for (Token token : tokenMap.keySet())
{
Range range = getPrimaryRangeFor(token, tokenMap);
- for (InetAddress ep : getReadStorageEndPoints(token, tokenMap))
+ for (InetAddress ep : getNaturalEndpoints(token, tokenMap))
{
map.get(ep).add(range);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
Wed Oct 21 19:18:43 2009
@@ -44,10 +44,10 @@
super(tokenMetadata, partitioner, replicas, storagePort);
}
- public InetAddress[] getReadStorageEndPoints(Token token, Map<Token,
InetAddress> tokenToEndPointMap)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token, Map<Token,
InetAddress> tokenToEndPointMap)
{
int startIndex;
- List<InetAddress> list = new ArrayList<InetAddress>();
+ ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
boolean bDataCenter = false;
boolean bOtherRack = false;
int foundCount = 0;
@@ -62,11 +62,11 @@
}
int totalNodes = tokens.size();
// Add the node at the index by default
- list.add(tokenToEndPointMap.get(tokens.get(index)));
+ endpoints.add(tokenToEndPointMap.get(tokens.get(index)));
foundCount++;
- if( replicas_ == 1 )
+ if (replicas_ == 1)
{
- return list.toArray(new InetAddress[list.size()]);
+ return endpoints;
}
startIndex = (index + 1)%totalNodes;
IEndPointSnitch endPointSnitch =
StorageService.instance().getEndPointSnitch();
@@ -81,7 +81,7 @@
// If we have already found something in a diff datacenter
no need to find another
if( !bDataCenter )
{
- list.add(tokenToEndPointMap.get(tokens.get(i)));
+ endpoints.add(tokenToEndPointMap.get(tokens.get(i)));
bDataCenter = true;
foundCount++;
}
@@ -94,7 +94,7 @@
// If we have already found something in a diff rack no
need to find another
if( !bOtherRack )
{
- list.add(tokenToEndPointMap.get(tokens.get(i)));
+ endpoints.add(tokenToEndPointMap.get(tokens.get(i)));
bOtherRack = true;
foundCount++;
}
@@ -111,12 +111,12 @@
// loop through the list and add until we have N nodes.
for (int i = startIndex, count = 1; count < totalNodes && foundCount <
replicas_; ++count, i = (i+1)%totalNodes)
{
- if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ if( ! endpoints.contains(tokenToEndPointMap.get(tokens.get(i))))
{
- list.add(tokenToEndPointMap.get(tokens.get(i)));
+ endpoints.add(tokenToEndPointMap.get(tokens.get(i)));
foundCount++;
}
}
- return list.toArray(new InetAddress[list.size()]);
+ return endpoints;
}
}
\ No newline at end of file
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
Wed Oct 21 19:18:43 2009
@@ -40,7 +40,7 @@
super(tokenMetadata, partitioner, replicas, storagePort);
}
- public InetAddress[] getReadStorageEndPoints(Token token, Map<Token,
InetAddress> tokenToEndPointMap)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token, Map<Token,
InetAddress> tokenToEndPointMap)
{
int startIndex;
List<Token> tokenList = new ArrayList<Token>();
@@ -74,9 +74,9 @@
foundCount++;
}
}
- List<InetAddress> list = new ArrayList<InetAddress>();
+ ArrayList<InetAddress> endpoints = new
ArrayList<InetAddress>(tokenList.size());
for (Token t: tokenList)
- list.add(tokenToEndPointMap.get(t));
- return list.toArray(new InetAddress[list.size()]);
+ endpoints.add(tokenToEndPointMap.get(t));
+ return endpoints;
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Oct 21 19:18:43 2009
@@ -114,9 +114,9 @@
long startTime = System.currentTimeMillis();
try
{
- InetAddress[] naturalEndpoints =
StorageService.instance().getReadStorageEndPoints(rm.key());
+ List<InetAddress> naturalEndpoints =
StorageService.instance().getNaturalEndpoints(rm.key());
// (This is the ZERO consistency level, so user doesn't care if we
don't really have N destinations available.)
- Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedStorageEndpointMap(rm.key(),
naturalEndpoints);
+ Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
Map<InetAddress, Message> messageMap =
createWriteMessages(rm, endpointMap);
for (Map.Entry<InetAddress, Message> entry :
messageMap.entrySet())
{
@@ -151,9 +151,9 @@
}
try
{
- InetAddress[] naturalEndpoints =
StorageService.instance().getReadStorageEndPoints(rm.key());
- Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedStorageEndpointMap(rm.key(),
naturalEndpoints);
- int blockFor = determineBlockFor(naturalEndpoints.length,
endpointMap.size(), consistency_level);
+ List<InetAddress> naturalEndpoints =
StorageService.instance().getNaturalEndpoints(rm.key());
+ Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
+ int blockFor = determineBlockFor(naturalEndpoints.size(),
endpointMap.size(), consistency_level);
List<InetAddress> primaryNodes = getUnhintedNodes(endpointMap);
if (primaryNodes.size() < blockFor) // guarantee blockFor = W live
nodes.
{
@@ -298,7 +298,7 @@
for (ReadCommand command: commands)
{
- InetAddress[] endpoints =
StorageService.instance().getReadStorageEndPoints(command.key);
+ List<InetAddress> endpoints =
StorageService.instance().getNaturalEndpoints(command.key);
boolean foundLocal =
Arrays.asList(endpoints).contains(FBUtilities.getLocalAddress());
//TODO: Throw InvalidRequest if we're in bootstrap mode?
if (foundLocal && !StorageService.instance().isBootstrapMode())
@@ -359,7 +359,7 @@
QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new
ReadResponseResolver());
InetAddress dataPoint =
StorageService.instance().findSuitableEndPoint(command.key);
- List<InetAddress> endpointList = new
ArrayList<InetAddress>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
+ List<InetAddress> endpointList =
StorageService.instance().getNaturalEndpoints(command.key);
/* Remove the local storage endpoint from the list. */
endpointList.remove(dataPoint);
InetAddress[] endPoints = new InetAddress[endpointList.size() + 1];
@@ -442,7 +442,7 @@
List<Row> rows = new ArrayList<Row>();
for (ReadCommand command: commands)
{
- List<InetAddress> endpoints =
StorageService.instance().getLiveReadStorageEndPoints(command.key);
+ List<InetAddress> endpoints =
StorageService.instance().getLiveNaturalEndpoints(command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(FBUtilities.getLocalAddress());
// TODO: throw a thrift exception if we do not have N nodes
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Oct 21 19:18:43 2009
@@ -338,9 +338,7 @@
Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range,
List<InetAddress>>();
for (Range range : ranges)
{
- InetAddress[] endpoints =
replicationStrategy_.getReadStorageEndPoints(range.right());
- // create a new ArrayList since a bunch of methods like to mutate
the endpointmap List
- rangeToEndPointMap.put(range, new
ArrayList<InetAddress>(Arrays.asList(endpoints)));
+ rangeToEndPointMap.put(range,
replicationStrategy_.getNaturalEndpoints(range.right()));
}
return rangeToEndPointMap;
}
@@ -359,8 +357,7 @@
Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range,
List<InetAddress>>();
for ( Range range : ranges )
{
- InetAddress[] endpoints =
replicationStrategy_.getReadStorageEndPoints(range.right(), tokenToEndPointMap);
- rangeToEndPointMap.put(range, new ArrayList<InetAddress>(
Arrays.asList(endpoints) ) );
+ rangeToEndPointMap.put(range,
replicationStrategy_.getNaturalEndpoints(range.right(), tokenToEndPointMap));
}
if (logger_.isDebugEnabled())
logger_.debug("Done constructing range to endpoint map ...");
@@ -755,9 +752,9 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public InetAddress[] getReadStorageEndPoints(String key)
+ public List<InetAddress> getNaturalEndpoints(String key)
{
- return
replicationStrategy_.getReadStorageEndPoints(partitioner_.getToken(key));
+ return
replicationStrategy_.getNaturalEndpoints(partitioner_.getToken(key));
}
/**
@@ -767,10 +764,10 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<InetAddress> getLiveReadStorageEndPoints(String key)
+ public List<InetAddress> getLiveNaturalEndpoints(String key)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
- InetAddress[] endpoints = getReadStorageEndPoints(key);
+ List<InetAddress> endpoints = getNaturalEndpoints(key);
for ( InetAddress endpoint : endpoints )
{
@@ -788,9 +785,9 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public Map<InetAddress, InetAddress> getHintedStorageEndpointMap(String
key, InetAddress[] naturalEndpoints)
+ public Map<InetAddress, InetAddress> getHintedEndpointMap(String key,
List<InetAddress> naturalEndpoints)
{
- return
replicationStrategy_.getHintedStorageEndPoints(partitioner_.getToken(key),
naturalEndpoints);
+ return
replicationStrategy_.getHintedEndpoints(partitioner_.getToken(key),
naturalEndpoints);
}
/**
@@ -799,7 +796,7 @@
*/
public InetAddress findSuitableEndPoint(String key) throws IOException,
UnavailableException
{
- InetAddress[] endpoints = getReadStorageEndPoints(key);
+ List<InetAddress> endpoints = getNaturalEndpoints(key);
for(InetAddress endPoint: endpoints)
{
if(endPoint.equals(FBUtilities.getLocalAddress()))
@@ -808,24 +805,24 @@
}
}
int j = 0;
- for ( ; j < endpoints.length; ++j )
+ for ( ; j < endpoints.size(); ++j )
{
- if (
StorageService.instance().isInSameDataCenter(endpoints[j]) &&
FailureDetector.instance().isAlive(endpoints[j]))
+ if (
StorageService.instance().isInSameDataCenter(endpoints.get(j)) &&
FailureDetector.instance().isAlive(endpoints.get(j)))
{
- return endpoints[j];
+ return endpoints.get(j);
}
}
// We have tried to be really nice but looks like there are no
servers
// in the local data center that are alive and can service this
request so
// just send it to the first alive guy and see if we get
anything.
j = 0;
- for ( ; j < endpoints.length; ++j )
+ for ( ; j < endpoints.size(); ++j )
{
- if ( FailureDetector.instance().isAlive(endpoints[j]))
+ if (
FailureDetector.instance().isAlive(endpoints.get(j)))
{
if (logger_.isDebugEnabled())
- logger_.debug("InetAddress " + endpoints[j] +
" is alive so get data from it.");
- return endpoints[j];
+ logger_.debug("InetAddress " +
endpoints.get(j) + " is alive so get data from it.");
+ return endpoints.get(j);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
Wed Oct 21 19:18:43 2009
@@ -26,6 +26,8 @@
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.Table;
import java.net.InetAddress;
+import java.util.List;
+
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
@@ -40,7 +42,7 @@
*/
private static boolean checkIfProcessKey(String key)
{
- InetAddress[] endPoints =
StorageService.instance().getReadStorageEndPoints(key);
+ List<InetAddress> endPoints =
StorageService.instance().getNaturalEndpoints(key);
InetAddress localEndPoint = FBUtilities.getLocalAddress();
for(InetAddress endPoint : endPoints)
{
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
Wed Oct 21 19:18:43 2009
@@ -18,6 +18,8 @@
package org.apache.cassandra.client;
import java.net.InetAddress;
+import java.util.List;
+
import org.apache.cassandra.service.Cassandra;
import org.apache.cassandra.service.Column;
import org.apache.cassandra.service.ColumnPath;
@@ -65,14 +67,14 @@
String row = "row" + nRows;
ColumnPath col = new ColumnPath("Standard1", null,
"col1".getBytes());
- InetAddress endPoints[] = ringCache.getEndPoint(row);
+ List<InetAddress> endPoints = ringCache.getEndPoint(row);
String hosts="";
- for (int i=0; i<endPoints.length; i++)
- hosts = hosts + ((i>0) ? "," : "") + endPoints[i];
- System.out.println("hosts with key " + row + " : " + hosts + ";
choose " + endPoints[0]);
+ for (int i = 0; i < endPoints.size(); i++)
+ hosts = hosts + ((i > 0) ? "," : "") + endPoints.get(i);
+ System.out.println("hosts with key " + row + " : " + hosts + ";
choose " + endPoints.get(0));
// now, read the row back directly from the host owning the row
locally
- setup(endPoints[0].getHostAddress(),
DatabaseDescriptor.getThriftPort());
+ setup(endPoints.get(0).getHostAddress(),
DatabaseDescriptor.getThriftPort());
thriftClient.insert(table, row, col, "val1".getBytes(), 1, 1);
Column column=thriftClient.get(table, row, col, 1).column;
System.out.println("read row " + row + " " + new
String(column.name) + ":" + new String(column.value) + ":" + column.timestamp);
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Wed Oct 21 19:18:43 2009
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Arrays;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -44,7 +45,7 @@
*/
StorageService.instance().updateTokenMetadataUnsafe(newToken,
newEndPoint);
- BootStrapper b = new BootStrapper(new InetAddress[]{newEndPoint},
newToken );
+ BootStrapper b = new BootStrapper(Arrays.asList(newEndPoint), newToken
);
Map<Range,List<BootstrapSourceTarget>> res =
b.getRangesWithSourceTarget();
int transferCount = 0;
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=828148&r1=828147&r2=828148&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Wed Oct 21 19:18:43 2009
@@ -37,7 +37,7 @@
public class RackUnawareStrategyTest
{
@Test
- public void testBigIntegerStorageEndPoints() throws UnknownHostException
+ public void testBigIntegerEndpoints() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new RandomPartitioner();
@@ -49,11 +49,11 @@
endPointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
}
- testGetStorageEndPoints(tmd, strategy, endPointTokens.toArray(new
Token[0]), keyTokens.toArray(new Token[0]));
+ testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]),
keyTokens.toArray(new Token[0]));
}
@Test
- public void testStringStorageEndPoints() throws UnknownHostException
+ public void testStringEndpoints() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new OrderPreservingPartitioner();
@@ -65,12 +65,12 @@
endPointTokens.add(new StringToken(String.valueOf((char)('a' + i *
2))));
keyTokens.add(partitioner.getToken(String.valueOf((char)('a' + i *
2 + 1))));
}
- testGetStorageEndPoints(tmd, strategy, endPointTokens.toArray(new
Token[0]), keyTokens.toArray(new Token[0]));
+ testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]),
keyTokens.toArray(new Token[0]));
}
// given a list of endpoint tokens, and a set of key tokens falling
between the endpoint tokens,
// make sure that the Strategy picks the right endpoints for the keys.
- private void testGetStorageEndPoints(TokenMetadata tmd,
AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[]
keyTokens) throws UnknownHostException
+ private void testGetEndpoints(TokenMetadata tmd,
AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[]
keyTokens) throws UnknownHostException
{
List<InetAddress> hosts = new ArrayList<InetAddress>();
for (int i = 0; i < endPointTokens.length; i++)
@@ -82,17 +82,17 @@
for (int i = 0; i < keyTokens.length; i++)
{
- InetAddress[] endPoints =
strategy.getReadStorageEndPoints(keyTokens[i]);
- assertEquals(3, endPoints.length);
- for (int j = 0; j < endPoints.length; j++)
+ List<InetAddress> endPoints =
strategy.getNaturalEndpoints(keyTokens[i]);
+ assertEquals(3, endPoints.size());
+ for (int j = 0; j < endPoints.size(); j++)
{
- assertEquals(endPoints[j], hosts.get((i + j + 1) %
hosts.size()));
+ assertEquals(endPoints.get(j), hosts.get((i + j + 1) %
hosts.size()));
}
}
}
@Test
- public void testGetStorageEndPointsDuringBootstrap() throws
UnknownHostException
+ public void testGetEndpointsDuringBootstrap() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new RandomPartitioner();
@@ -122,18 +122,17 @@
for (int i = 0; i < keyTokens.length; i++)
{
- InetAddress[] endPoints =
strategy.getWriteStorageEndPoints(keyTokens[i],
strategy.getReadStorageEndPoints(keyTokens[i]));
- assertTrue(endPoints.length >=3);
- List<InetAddress> endPointsList = Arrays.asList(endPoints);
+ List<InetAddress> endPoints =
strategy.getWriteEndpoints(keyTokens[i],
strategy.getNaturalEndpoints(keyTokens[i]));
+ assertTrue(endPoints.size() >= 3);
for (int j = 0; j < 3; j++)
{
//Check that the old nodes are definitely included
- assertTrue(endPointsList.contains(hosts.get((i + j + 1) %
hosts.size())));
+ assertTrue(endPoints.contains(hosts.get((i + j + 1) %
hosts.size())));
}
// for 5, 15, 25 this should include bootstrap node
if (i < 3)
- assertTrue(endPointsList.contains(bootstrapEndPoint));
+ assertTrue(endPoints.contains(bootstrapEndPoint));
}
}
}