Author: jbellis
Date: Mon May 10 18:52:38 2010
New Revision: 942840
URL: http://svn.apache.org/viewvc?rev=942840&view=rev
Log:
vijay
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Modified:
cassandra/trunk/conf/datacenters.properties
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
Modified: cassandra/trunk/conf/datacenters.properties
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/datacenters.properties?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/conf/datacenters.properties (original)
+++ cassandra/trunk/conf/datacenters.properties Mon May 10 18:52:38 2010
@@ -14,9 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# datacenter=replication factor
# The sum of all the datacenter replication factor values should equal
# the replication factor of the keyspace (i.e. sum(dc_rf) = RF)
-dc1=3
-dc2=5
-dc3=1
+
+# keyspace\:datacenter=replication factor
+Keyspace1\:dc1=3
+Keyspace1\:dc2=5
+keyspace1\:dc3=1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Mon May 10 18:52:38 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.util.Collection;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
@@ -37,15 +38,20 @@ import java.net.InetAddress;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.utils.FBUtilities.UTF8;
import org.apache.cassandra.utils.WrappedRunnable;
+import com.google.common.collect.Multimap;
+
/**
* For each table (keyspace), there is a row in the system hints CF.
@@ -104,7 +110,7 @@ public class HintedHandOffManager
}, "Hint delivery").start();
}
- private static boolean sendMessage(InetAddress endpoint, String tableName,
byte[] key) throws IOException
+ private static boolean sendMessage(InetAddress endpoint, String tableName,
byte[] key) throws IOException, UnavailableException
{
if (!Gossiper.instance.isKnownEndpoint(endpoint))
{
@@ -126,8 +132,12 @@ public class HintedHandOffManager
rm.add(cf);
}
Message message = rm.makeRowMutationMessage();
- WriteResponseHandler responseHandler = new WriteResponseHandler(1,
tableName);
- MessagingService.instance.sendRR(message, new InetAddress[] { endpoint
}, responseHandler);
+ InetAddress [] endpoints = new InetAddress[] { endpoint };
+ AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(tableName);
+ List<InetAddress> endpointlist = Arrays.asList(endpoints);
+ Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(endpointlist);
+ WriteResponseHandler responseHandler = new
WriteResponseHandler(endpointlist, hintedEndpoints, ConsistencyLevel.ALL,
tableName);
+ MessagingService.instance.sendRR(message, endpoints, responseHandler);
try
{
@@ -154,8 +164,9 @@ public class HintedHandOffManager
rm.apply();
}
- /** hintStore must be the hints columnfamily from the system table */
- private static void deliverAllHints() throws DigestMismatchException,
IOException, InvalidRequestException, TimeoutException
+ /** hintStore must be the hints columnfamily from the system table
+ * @throws UnavailableException */
+ private static void deliverAllHints() throws DigestMismatchException,
IOException, InvalidRequestException, TimeoutException, UnavailableException
{
if (logger_.isDebugEnabled())
logger_.debug("Started deliverAllHints");
@@ -223,7 +234,7 @@ public class HintedHandOffManager
|| (hintColumnFamily.getSortedColumns().size() == 1 &&
hintColumnFamily.getColumn(startColumn) != null);
}
- private static void deliverHintsToEndpoint(InetAddress endpoint) throws
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+ private static void deliverHintsToEndpoint(InetAddress endpoint) throws
IOException, DigestMismatchException, InvalidRequestException,
TimeoutException, UnavailableException
{
if (logger_.isDebugEnabled())
logger_.debug("Started hinted handoff for endpoint " + endpoint);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Mon May 10 18:52:38 2010
@@ -27,11 +27,15 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -41,7 +45,7 @@ public abstract class AbstractReplicatio
{
protected static final Logger logger_ =
LoggerFactory.getLogger(AbstractReplicationStrategy.class);
- private TokenMetadata tokenMetadata_;
+ protected TokenMetadata tokenMetadata_;
protected final IEndpointSnitch snitch_;
AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch
snitch)
@@ -190,4 +194,8 @@ public abstract class AbstractReplicatio
return getAddressRanges(temp, table).get(pendingAddress);
}
+ public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver
responseResolver, ConsistencyLevel consistencyLevel, String table)
+ {
+ return new QuorumResponseHandler(responseResolver, consistencyLevel,
table);
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
Mon May 10 18:52:38 2010
@@ -150,37 +150,36 @@ public class DatacenterShardStrategy ext
}
}
- private ArrayList<InetAddress> getNaturalEndpointsInternal(Token
searchToken, TokenMetadata metadata) throws IOException
+ private ArrayList<InetAddress> getNaturalEndpointsInternal(Token
searchToken, TokenMetadata metadata, String table) throws UnknownHostException
{
ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
if (metadata.sortedTokens().size() == 0)
return endpoints;
- if (null == tokens || tokens.size() != metadata.sortedTokens().size())
+ if (tokensize != metadata.sortedTokens().size())
{
- loadEndpoints(metadata);
+ loadEndPoints(metadata);
}
- for (String dc : dcMap.keySet())
+ for (String dc : dcTokens.keySet())
{
- int replicas_ = dcReplicationFactor.get(dc);
- ArrayList<InetAddress> forloopReturn = new
ArrayList<InetAddress>(replicas_);
- List<Token> tokens = dcMap.get(dc);
+ int replicas_ = getReplicationFactor(dc, table);
+ List<Token> tokens = dcTokens.get(dc);
boolean bOtherRack = false;
boolean doneDataCenterItr;
// Add the node at the index by default
Iterator<Token> iter = TokenMetadata.ringIterator(tokens,
searchToken);
InetAddress primaryHost = metadata.getEndpoint(iter.next());
- forloopReturn.add(primaryHost);
+ endpoints.add(primaryHost);
- while (forloopReturn.size() < replicas_ && iter.hasNext())
+ while (endpoints.size() < replicas_ && iter.hasNext())
{
Token t = iter.next();
- InetAddress endpointOfInterest = metadata.getEndpoint(t);
- if (forloopReturn.size() < replicas_ - 1)
+ InetAddress endPointOfInterest = metadata.getEndpoint(t);
+ if (endpoints.size() < replicas_ - 1)
{
- forloopReturn.add(endpointOfInterest);
+ endpoints.add(endPointOfInterest);
continue;
}
else
@@ -191,10 +190,9 @@ public class DatacenterShardStrategy ext
// Now try to find one on a different rack
if (!bOtherRack)
{
- AbstractRackAwareSnitch snitch =
(AbstractRackAwareSnitch)snitch_;
- if
(!snitch.getRack(primaryHost).equals(snitch.getRack(endpointOfInterest)))
+ if
(!snitch.getRack(primaryHost).equals(snitch.getRack(endPointOfInterest)))
{
- forloopReturn.add(metadata.getEndpoint(t));
+ endpoints.add(metadata.getEndpoint(t));
bOtherRack = true;
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
Mon May 10 18:52:38 2010
@@ -19,8 +19,6 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
/**
* A simple endpoint snitch implementation that assumes datacenter and rack
information is encoded
@@ -28,12 +26,12 @@ import java.util.*;
*/
public class RackInferringSnitch extends AbstractRackAwareSnitch
{
- public String getRack(InetAddress endpoint) throws UnknownHostException
+ public String getRack(InetAddress endpoint)
{
return Byte.toString(endpoint.getAddress()[2]);
}
- public String getDatacenter(InetAddress endpoint) throws
UnknownHostException
+ public String getDatacenter(InetAddress endpoint)
{
return Byte.toString(endpoint.getAddress()[1]);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Mon May 10 18:52:38 2010
@@ -112,7 +112,7 @@ class ConsistencyChecker implements Runn
if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
- IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
+ IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_);
IAsyncCallback responseHandler;
if (replicas_.contains(FBUtilities.getLocalAddress()))
responseHandler = new DataRepairHandler(row_,
replicas_.size(), readResponseResolver);
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=942840&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Mon May 10 18:52:38 2010
@@ -0,0 +1,63 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.locator.RackInferringSnitch;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler will make sure to that all the responses
+ * recived are from the same dc and will validate it with the
+ *
+ * @author Vijay Parthasarathy
+ */
+public class DatacenterQuorumResponseHandler<T> extends
QuorumResponseHandler<T>
+{
+ private static final RackInferringSnitch snitch = (RackInferringSnitch)
DatabaseDescriptor.getEndpointSnitch();
+ private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
+ private AtomicInteger localResponses;
+
+ public DatacenterQuorumResponseHandler(IResponseResolver<T>
responseResolver, ConsistencyLevel consistencyLevel, String table)
+ {
+ super(responseResolver, consistencyLevel, table);
+ localResponses = new AtomicInteger(blockfor);
+ }
+
+ @Override
+ public void response(Message message)
+ {
+ try
+ {
+ int b = -1;
+ responses.add(message);
+ // If DCQuorum/DCQuorumSync, check if the response is from the
local DC.
+ if (localdc.equals(snitch.getDatacenter(message.getFrom())))
+ {
+ b = localResponses.decrementAndGet();
+ } else {
+ b = localResponses.get();
+ }
+ if (b == 0 && responseResolver.isDataPresent(responses))
+ {
+ condition.signal();
+ }
+ }
+ catch (Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public int determineBlockFor(ConsistencyLevel consistency_level, String
table)
+ {
+ DatacenterShardStrategy stategy = (DatacenterShardStrategy)
StorageService.instance.getReplicationStrategy(table);
+ return (stategy.getReplicationFactor(localdc, table)/2) + 1;
+ }
+}
\ No newline at end of file
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
Mon May 10 18:52:38 2010
@@ -24,13 +24,19 @@ package org.apache.cassandra.service;
*/
+import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+import com.google.common.collect.Multimap;
/**
* This class will block for the replication factor which is
@@ -39,58 +45,85 @@ import org.apache.cassandra.net.Message;
*/
public class DatacenterSyncWriteResponseHandler extends WriteResponseHandler
{
- private final Map<String, Integer> dcResponses = new HashMap<String,
Integer>();
- private final Map<String, Integer> responseCounts;
- private final AbstractRackAwareSnitch endpointSnitch;
+ private final DatacenterShardStrategy stategy =
(DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
+ private HashMap<String, AtomicInteger> dcResponses;
- public DatacenterSyncWriteResponseHandler(Map<String, Integer>
responseCounts, String table)
+ public DatacenterSyncWriteResponseHandler(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints,
ConsistencyLevel consistencyLevel, String table)
+ throws UnavailableException
{
// Response is been managed by the map so make it 1 for the superclass.
- super(1, table);
- this.responseCounts = responseCounts;
- endpointSnitch = (AbstractRackAwareSnitch)
DatabaseDescriptor.getEndpointSnitch();
+ super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
}
- @Override
// synchronized for the benefit of dcResponses and responseCounts.
"responses" itself
// is inherited from WRH and is concurrent.
- // TODO can we use concurrent structures instead?
- public synchronized void response(Message message)
+ @Override
+ public void response(Message message)
{
+ responses.add(message);
try
{
- String dataCenter =
endpointSnitch.getDatacenter(message.getFrom());
- Object blockFor = responseCounts.get(dataCenter);
+ String dataCenter =
endpointsnitch.getDatacenter(message.getFrom());
// If this DC needs to be blocked then do the below.
- if (blockFor != null)
- {
- Integer quorumCount = dcResponses.get(dataCenter);
- if (quorumCount == null)
- {
- // Intialize and recognize the first response
- dcResponses.put(dataCenter, 1);
- }
- else if ((Integer) blockFor > quorumCount)
- {
- // recognize the consequtive responses.
- dcResponses.put(dataCenter, quorumCount + 1);
- }
- else
- {
- // No need to wait on it anymore so remove it.
- responseCounts.remove(dataCenter);
- }
- }
+ dcResponses.get(dataCenter).getAndDecrement();
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
- responses.add(message);
- // If done then the response count will be empty
- if (responseCounts.isEmpty())
- {
- condition.signal();
+ maybeSignal();
+ }
+
+ private void maybeSignal()
+ {
+ for(AtomicInteger i : dcResponses.values()) {
+ if (0 < i.get()) {
+ return;
+ }
+ }
+ // If all the quorum conditionas are met then return back.
+ condition.signal();
+ }
+
+ @Override
+ public int determineBlockFor(Collection<InetAddress> writeEndpoints)
+ {
+ this.dcResponses = new HashMap<String, AtomicInteger>();
+ for (String dc: stategy.getDatacenters(table)) {
+ int rf = stategy.getReplicationFactor(dc, table);
+ dcResponses.put(dc, new AtomicInteger((rf/2) + 1));
+ }
+ // Do nothing, there is no 'one' integer to block for
+ return 0;
+ }
+
+ @Override
+ public void assureSufficientLiveNodes(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints) throws
UnavailableException
+ {
+ Map<String, AtomicInteger> dcEndpoints = new HashMap<String,
AtomicInteger>();
+ try
+ {
+ for (String dc: stategy.getDatacenters(table))
+ dcEndpoints.put(dc, new AtomicInteger());
+ for (InetAddress destination : hintedEndpoints.keySet())
+ {
+ // If not just go to the next endpoint
+ if (!writeEndpoints.contains(destination))
+ continue;
+ // figure out the destination dc
+ String destinationDC =
endpointsnitch.getDatacenter(destination);
+
dcEndpoints.get(destinationDC).incrementAndGet();
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ throw new UnavailableException();
+ }
+ // Throw exception if any of the DC doesnt have livenodes to accept
write.
+ for (String dc: stategy.getDatacenters(table)) {
+ if (dcEndpoints.get(dc).get() != dcResponses.get(dc).get()) {
+ throw new UnavailableException();
+ }
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
Mon May 10 18:52:38 2010
@@ -26,13 +26,19 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.locator.RackInferringSnitch;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.collect.Multimap;
+
/**
* This class will basically will block for the replication factor which is
* provided in the input map. it will block till we recive response from (DC,
n)
@@ -40,27 +46,26 @@ import org.apache.cassandra.utils.FBUtil
*/
public class DatacenterWriteResponseHandler extends WriteResponseHandler
{
- private final AtomicInteger blockFor;
- private final AbstractRackAwareSnitch endpointsnitch;
- private final InetAddress localEndpoint;
+ private static final RackInferringSnitch snitch = (RackInferringSnitch)
DatabaseDescriptor.getEndpointSnitch();
+ private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
+ private final AtomicInteger blockFor;
- public DatacenterWriteResponseHandler(int blockFor, String table)
+ public DatacenterWriteResponseHandler(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints,
ConsistencyLevel consistencyLevel, String table) throws UnavailableException
{
// Response is been managed by the map so the waitlist size really
doesnt matter.
- super(blockFor, table);
- this.blockFor = new AtomicInteger(blockFor);
- endpointsnitch = (AbstractRackAwareSnitch)
DatabaseDescriptor.getEndpointSnitch();
- localEndpoint = FBUtilities.getLocalAddress();
+ super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+ blockFor = new AtomicInteger(responseCount);
}
@Override
public void response(Message message)
{
+ responses.add(message);
//Is optimal to check if same datacenter than comparing Arrays.
int b = -1;
try
{
- if
(endpointsnitch.getDatacenter(localEndpoint).equals(endpointsnitch.getDatacenter(message.getFrom())))
+ if
(localdc.equals(endpointsnitch.getDatacenter(message.getFrom())))
{
b = blockFor.decrementAndGet();
}
@@ -69,7 +74,6 @@ public class DatacenterWriteResponseHand
{
throw new RuntimeException(e);
}
- responses.add(message);
if (b == 0)
{
//Singnal when Quorum is recived.
@@ -78,4 +82,35 @@ public class DatacenterWriteResponseHand
if (logger.isDebugEnabled())
logger.debug("Processed Message: " + message.toString());
}
-}
+
+ @Override
+ public void assureSufficientLiveNodes(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
+ throws UnavailableException
+ {
+ int liveNodes = 0;
+ try {
+ // count destinations that are part of the desired target set
+ for (InetAddress destination : hintedEndpoints.keySet())
+ {
+ if (writeEndpoints.contains(destination) &&
localdc.equals(endpointsnitch.getDatacenter(destination)))
+ liveNodes++;
+ }
+ } catch (Exception ex) {
+ throw new UnavailableException();
+ }
+ if (liveNodes < responseCount)
+ {
+ throw new UnavailableException();
+ }
+ }
+
+ @Override
+ public int determineBlockFor(Collection<InetAddress> writeEndpoints)
+ {
+ DatacenterShardStrategy stategy = (DatacenterShardStrategy)
StorageService.instance.getReplicationStrategy(table);
+ if (consistencyLevel.equals(ConsistencyLevel.DCQUORUM)) {
+ return (stategy.getReplicationFactor(localdc, table)/2) + 1;
+ }
+ return super.determineBlockFor(writeEndpoints);
+ }
+}
\ No newline at end of file
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Mon May 10 18:52:38 2010
@@ -19,9 +19,6 @@
package org.apache.cassandra.service;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -31,6 +28,7 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.SimpleCondition;
import org.slf4j.Logger;
@@ -40,15 +38,19 @@ public class QuorumResponseHandler<T> im
{
protected static final Logger logger = LoggerFactory.getLogger(
QuorumResponseHandler.class );
protected final SimpleCondition condition = new SimpleCondition();
- protected final Collection<Message> responses;
- private IResponseResolver<T> responseResolver;
+ protected final Collection<Message> responses = new
LinkedBlockingQueue<Message>();;
+ protected IResponseResolver<T> responseResolver;
private final long startTime;
-
- public QuorumResponseHandler(int responseCount, IResponseResolver<T>
responseResolver)
+ protected int blockfor;
+
+ /**
+ * Constructor when response count has to be calculated and blocked for.
+ */
+ public QuorumResponseHandler(IResponseResolver<T> responseResolver,
ConsistencyLevel consistencyLevel, String table)
{
- responses = new LinkedBlockingQueue<Message>();
+ this.blockfor = determineBlockFor(consistencyLevel, table);
this.responseResolver = responseResolver;
- startTime = System.currentTimeMillis();
+ this.startTime = System.currentTimeMillis();
}
public T get() throws TimeoutException, DigestMismatchException,
IOException
@@ -90,9 +92,28 @@ public class QuorumResponseHandler<T> im
public void response(Message message)
{
responses.add(message);
+ if (responses.size() < blockfor) {
+ return;
+ }
if (responseResolver.isDataPresent(responses))
{
condition.signal();
}
}
+
+ public int determineBlockFor(ConsistencyLevel consistencyLevel, String
table)
+ {
+ switch (consistencyLevel)
+ {
+ case ONE:
+ case ANY:
+ return 1;
+ case QUORUM:
+ return (DatabaseDescriptor.getQuorum(table)/ 2) + 1;
+ case ALL:
+ return DatabaseDescriptor.getReplicationFactor(table);
+ default:
+ throw new UnsupportedOperationException("invalid consistency
level: " + table.toString());
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Mon May 10 18:52:38 2010
@@ -30,7 +30,6 @@ import org.apache.cassandra.db.*;
import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,17 +42,12 @@ public class ReadResponseResolver implem
{
private static Logger logger_ =
LoggerFactory.getLogger(ReadResponseResolver.class);
private final String table;
- private final int responseCount;
- public ReadResponseResolver(String table, int responseCount)
+ public ReadResponseResolver(String table)
{
- assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor(table)
- : "invalid response count " + responseCount;
-
- this.responseCount = responseCount;
this.table = table;
}
-
+
/*
* This method for resolving read data should look at the timestamps of
each
* of the columns that are read and should pick up columns with the latest
@@ -159,9 +153,6 @@ public class ReadResponseResolver implem
public boolean isDataPresent(Collection<Message> responses)
{
- if (responses.size() < responseCount)
- return false;
-
boolean isDataPresent = false;
for (Message response : responses)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon
May 10 18:52:38 2010
@@ -213,13 +213,9 @@ public class StorageProxy implements Sto
List<InetAddress> naturalEndpoints =
ss.getNaturalEndpoints(table, rm.key());
Collection<InetAddress> writeEndpoints =
rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table,
naturalEndpoints);
Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
- int blockFor = determineBlockFor(writeEndpoints.size(),
consistency_level);
-
- // avoid starting a write we know can't achieve the required
consistency
- assureSufficientLiveNodes(blockFor, writeEndpoints,
hintedEndpoints, consistency_level);
// send out the writes, as in mutate() above, but this time
with a callback that tracks responses
- final WriteResponseHandler responseHandler =
ss.getWriteResponseHandler(blockFor, consistency_level, table);
+ final WriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level,
table);
responseHandlers.add(responseHandler);
Message unhintedMessage = null;
for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
@@ -287,29 +283,6 @@ public class StorageProxy implements Sto
}
- private static void assureSufficientLiveNodes(int blockFor,
Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress>
hintedEndpoints, ConsistencyLevel consistencyLevel)
- throws UnavailableException
- {
- if (consistencyLevel == ConsistencyLevel.ANY)
- {
- // ensure there are blockFor distinct living nodes (hints are ok).
- if (hintedEndpoints.keySet().size() < blockFor)
- throw new UnavailableException();
- }
-
- // count destinations that are part of the desired target set
- int liveNodes = 0;
- for (InetAddress destination : hintedEndpoints.keySet())
- {
- if (writeEndpoints.contains(destination))
- liveNodes++;
- }
- if (liveNodes < blockFor)
- {
- throw new UnavailableException();
- }
- }
-
private static void insertLocalMessage(final RowMutation rm, final
WriteResponseHandler responseHandler)
{
if (logger.isDebugEnabled())
@@ -325,26 +298,6 @@ public class StorageProxy implements Sto
StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
}
- private static int determineBlockFor(int expandedTargets, ConsistencyLevel
consistency_level)
- {
- switch (consistency_level)
- {
- case ONE:
- case ANY:
- return 1;
- case QUORUM:
- return (expandedTargets / 2) + 1;
- case DCQUORUM:
- case DCQUORUMSYNC:
- // TODO this is broken
- return expandedTargets;
- case ALL:
- return expandedTargets;
- default:
- throw new UnsupportedOperationException("invalid consistency
level " + consistency_level);
- }
- }
-
/**
* Read the data from one replica. When we get
* the data we perform consistency checks and figure out if any repairs
need to be done to the replicas.
@@ -461,10 +414,6 @@ public class StorageProxy implements Sto
InetAddress dataPoint =
StorageService.instance.findSuitableEndpoint(command.table, command.key);
List<InetAddress> endpointList =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- final String table = command.table;
- int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table),
consistency_level);
- if (endpointList.size() < responseCount)
- throw new UnavailableException();
InetAddress[] endpoints = new InetAddress[endpointList.size()];
Message messages[] = new Message[endpointList.size()];
@@ -479,7 +428,9 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
- QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new
ReadResponseResolver(command.table, responseCount));
+ AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(command.table);
+ QuorumResponseHandler<Row> quorumResponseHandler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
+
consistency_level, command.table);
MessagingService.instance.sendRR(messages, endpoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndpoints.add(endpoints);
@@ -503,10 +454,10 @@ public class StorageProxy implements Sto
{
if (randomlyReadRepair(command))
{
- IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table,
DatabaseDescriptor.getQuorum(command.table));
- QuorumResponseHandler<Row> quorumResponseHandlerRepair =
new QuorumResponseHandler<Row>(
- DatabaseDescriptor.getQuorum(command.table),
- readResponseResolverRepair);
+ IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table);
+ AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(command.table);
+ QuorumResponseHandler<Row> quorumResponseHandlerRepair =
rs.getQuorumResponseHandler(readResponseResolverRepair,
ConsistencyLevel.QUORUM,
+
command.table);
logger.info("DigestMismatchException: " + ex.getMessage());
Message messageRepair = command.makeReadMessage();
MessagingService.instance.sendRR(messageRepair,
commandEndpoints.get(commandIndex), quorumResponseHandlerRepair);
@@ -566,9 +517,7 @@ public class StorageProxy implements Sto
long startTime = System.nanoTime();
final String table = command.keyspace;
- int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table),
consistency_level);
-
- List<Pair<AbstractBounds, List<InetAddress>>> ranges =
getRestrictedRanges(command.range, command.keyspace, responseCount);
+ List<Pair<AbstractBounds, List<InetAddress>>> ranges =
getRestrictedRanges(command.range, command.keyspace);
// now scan until we have enough results
List<Row> rows = new ArrayList<Row>(command.max_keys);
@@ -581,8 +530,8 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, endpoints);
- QuorumResponseHandler<List<Row>> handler = new
QuorumResponseHandler<List<Row>>(responseCount, resolver);
-
+ AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(table);
+ QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level, table);
for (InetAddress endpoint : endpoints)
{
MessagingService.instance.sendRR(message, endpoint, handler);
@@ -678,7 +627,7 @@ public class StorageProxy implements Sto
* D, but we don't want any other results from it until after the (D,
T] range. Unwrapping so that
* the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
*/
- private static List<Pair<AbstractBounds, List<InetAddress>>>
getRestrictedRanges(AbstractBounds queryRange, String keyspace, int
responseCount)
+ private static List<Pair<AbstractBounds, List<InetAddress>>>
getRestrictedRanges(AbstractBounds queryRange, String keyspace)
throws UnavailableException
{
TokenMetadata tokenMetadata =
StorageService.instance.getTokenMetadata();
@@ -689,11 +638,8 @@ public class StorageProxy implements Sto
Token nodeToken = iter.next();
Range nodeRange = new
Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
- if (endpoints.size() < responseCount)
- throw new UnavailableException();
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
- List<InetAddress> endpointsForCL = endpoints.subList(0,
responseCount);
Set<AbstractBounds> restrictedRanges =
queryRange.restrictTo(nodeRange);
for (AbstractBounds range : restrictedRanges)
{
@@ -701,7 +647,7 @@ public class StorageProxy implements Sto
{
if (logger.isDebugEnabled())
logger.debug("Adding to restricted ranges " +
unwrapped + " for " + nodeRange);
- ranges.add(new Pair<AbstractBounds,
List<InetAddress>>(unwrapped, endpointsForCL));
+ ranges.add(new Pair<AbstractBounds,
List<InetAddress>>(unwrapped, endpoints));
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon May 10 18:52:38 2010
@@ -50,7 +50,6 @@ import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -1521,11 +1520,6 @@ public class StorageService implements I
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter +
token.toString()));
}
- public WriteResponseHandler getWriteResponseHandler(int blockFor,
ConsistencyLevel consistency_level, String table)
- {
- return getReplicationStrategy(table).getWriteResponseHandler(blockFor,
consistency_level, table);
- }
-
public boolean isClientMode()
{
return isClientMode;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Mon May 10 18:52:38 2010
@@ -19,41 +19,45 @@
package org.apache.cassandra.service;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.io.IOException;
+import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Multimap;
+
public class WriteResponseHandler implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger(
WriteResponseHandler.class );
+ protected static final AbstractRackAwareSnitch endpointsnitch =
(AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
protected final SimpleCondition condition = new SimpleCondition();
- private final int responseCount;
+ protected final int responseCount;
protected final Collection<Message> responses;
protected AtomicInteger localResponses = new AtomicInteger(0);
- private final long startTime;
+ protected final long startTime;
+ protected final ConsistencyLevel consistencyLevel;
+ protected final String table;
- public WriteResponseHandler(int responseCount, String table)
+ public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel
consistencyLevel, String table)
+ throws UnavailableException
{
- // at most one node per range can bootstrap at a time, and these will
be added to the write until
- // bootstrap finishes (at which point we no longer need to write to
the old ones).
- assert 1 <= responseCount && responseCount <= 2 *
DatabaseDescriptor.getReplicationFactor(table)
- : "invalid response count " + responseCount;
-
- this.responseCount = responseCount;
+ this.table = table;
+ this.consistencyLevel = consistencyLevel;
+ this.responseCount = determineBlockFor(writeEndpoints);
+ assureSufficientLiveNodes(writeEndpoints, hintedEndpoints);
responses = new LinkedBlockingQueue<Message>();
startTime = System.currentTimeMillis();
}
@@ -106,4 +110,54 @@ public class WriteResponseHandler implem
condition.signal();
}
}
-}
+
+ public int determineBlockFor(Collection<InetAddress> writeEndpoints)
+ {
+ int blockFor = 0;
+ switch (consistencyLevel)
+ {
+ case ONE:
+ blockFor = 1;
+ break;
+ case ANY:
+ blockFor = 1;
+ break;
+ case QUORUM:
+ blockFor = (writeEndpoints.size() / 2) + 1;
+ break;
+ case ALL:
+ blockFor = writeEndpoints.size();
+ break;
+ default:
+ throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel.toString());
+ }
+ // at most one node per range can bootstrap at a time, and these will
be added to the write until
+ // bootstrap finishes (at which point we no longer need to write to
the old ones).
+ assert 1 <= blockFor && blockFor <= 2 *
DatabaseDescriptor.getReplicationFactor(table)
+ : "invalid response count " + responseCount;
+ return blockFor;
+ }
+
+ public void assureSufficientLiveNodes(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
+ throws UnavailableException
+ {
+ if (consistencyLevel == ConsistencyLevel.ANY)
+ {
+ // ensure there are blockFor distinct living nodes (hints are ok).
+ if (hintedEndpoints.keySet().size() < responseCount)
+ throw new UnavailableException();
+ }
+
+ // count destinations that are part of the desired target set
+ int liveNodes = 0;
+ for (InetAddress destination : hintedEndpoints.keySet())
+ {
+ if (writeEndpoints.contains(destination))
+ liveNodes++;
+ }
+ if (liveNodes < responseCount)
+ {
+ throw new UnavailableException();
+ }
+ }
+}
\ No newline at end of file
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
Mon May 10 18:52:38 2010
@@ -1,18 +1,31 @@
package org.apache.cassandra.locator;
-import org.junit.Test;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import javax.xml.parsers.ParserConfigurationException;
-import org.apache.cassandra.config.ConfigurationException;
+import org.junit.Test;
+import org.xml.sax.SAXException;
public class DatacenterStrategyTest
{
- @Test
- public void testProperties() throws ConfigurationException
+ private String table = "Keyspace1";
+
+ @Test
+ public void testProperties() throws IOException,
ParserConfigurationException, SAXException
{
- DatacenterShardStrategy strategy = new DatacenterShardStrategy(new
TokenMetadata(), new RackInferringSnitch());
- assert strategy.getReplicationFactor("dc1") == 3;
- assert strategy.getReplicationFactor("dc2") == 5;
- assert strategy.getReplicationFactor("dc3") == 1;
+ XMLFileSnitch snitch = new XMLFileSnitch();
+ TokenMetadata metadata = new TokenMetadata();
+ InetAddress localhost = InetAddress.getLocalHost();
+ // Set the localhost to the tokenmetadata. Embeded cassandra way?
+ // metadata.addBootstrapToken();
+ DatacenterShardStrategy strategy = new DatacenterShardStrategy(new
TokenMetadata(), snitch);
+ assert strategy.getReplicationFactor("dc1", table) == 3;
+ assert strategy.getReplicationFactor("dc2", table) == 5;
+ assert strategy.getReplicationFactor("dc3", table) == 1;
+ // Query for the natural hosts
+ // strategy.getNaturalEndpoints(token, table)
}
}