Author: jbellis
Date: Mon May 10 18:53:49 2010
New Revision: 942842
URL: http://svn.apache.org/viewvc?rev=942842&view=rev
Log:
revert unreviewed patch to CASSANDRA-952
Removed:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Modified:
cassandra/trunk/conf/datacenters.properties
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
Modified: cassandra/trunk/conf/datacenters.properties
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/datacenters.properties?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/conf/datacenters.properties (original)
+++ cassandra/trunk/conf/datacenters.properties Mon May 10 18:53:49 2010
@@ -14,10 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+# datacenter=replication factor
# The sum of all the datacenter replication factor values should equal
# the replication factor of the keyspace (i.e. sum(dc_rf) = RF)
-
-# keyspace\:datacenter=replication factor
-Keyspace1\:dc1=3
-Keyspace1\:dc2=5
-keyspace1\:dc3=1
+dc1=3
+dc2=5
+dc3=1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Mon May 10 18:53:49 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.util.Collection;
import java.util.Arrays;
-import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
@@ -38,20 +37,15 @@ import java.net.InetAddress;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.utils.FBUtilities.UTF8;
import org.apache.cassandra.utils.WrappedRunnable;
-import com.google.common.collect.Multimap;
-
/**
* For each table (keyspace), there is a row in the system hints CF.
@@ -110,7 +104,7 @@ public class HintedHandOffManager
}, "Hint delivery").start();
}
- private static boolean sendMessage(InetAddress endpoint, String tableName,
byte[] key) throws IOException, UnavailableException
+ private static boolean sendMessage(InetAddress endpoint, String tableName,
byte[] key) throws IOException
{
if (!Gossiper.instance.isKnownEndpoint(endpoint))
{
@@ -132,12 +126,8 @@ public class HintedHandOffManager
rm.add(cf);
}
Message message = rm.makeRowMutationMessage();
- InetAddress [] endpoints = new InetAddress[] { endpoint };
- AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(tableName);
- List<InetAddress> endpointlist = Arrays.asList(endpoints);
- Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(endpointlist);
- WriteResponseHandler responseHandler = new
WriteResponseHandler(endpointlist, hintedEndpoints, ConsistencyLevel.ALL,
tableName);
- MessagingService.instance.sendRR(message, endpoints, responseHandler);
+ WriteResponseHandler responseHandler = new WriteResponseHandler(1,
tableName);
+ MessagingService.instance.sendRR(message, new InetAddress[] { endpoint
}, responseHandler);
try
{
@@ -164,9 +154,8 @@ public class HintedHandOffManager
rm.apply();
}
- /** hintStore must be the hints columnfamily from the system table
- * @throws UnavailableException */
- private static void deliverAllHints() throws DigestMismatchException,
IOException, InvalidRequestException, TimeoutException, UnavailableException
+ /** hintStore must be the hints columnfamily from the system table */
+ private static void deliverAllHints() throws DigestMismatchException,
IOException, InvalidRequestException, TimeoutException
{
if (logger_.isDebugEnabled())
logger_.debug("Started deliverAllHints");
@@ -234,7 +223,7 @@ public class HintedHandOffManager
|| (hintColumnFamily.getSortedColumns().size() == 1 &&
hintColumnFamily.getColumn(startColumn) != null);
}
- private static void deliverHintsToEndpoint(InetAddress endpoint) throws
IOException, DigestMismatchException, InvalidRequestException,
TimeoutException, UnavailableException
+ private static void deliverHintsToEndpoint(InetAddress endpoint) throws
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
{
if (logger_.isDebugEnabled())
logger_.debug("Started hinted handoff for endpoint " + endpoint);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Mon May 10 18:53:49 2010
@@ -27,15 +27,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.service.IResponseResolver;
-import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -45,7 +41,7 @@ public abstract class AbstractReplicatio
{
protected static final Logger logger_ =
LoggerFactory.getLogger(AbstractReplicationStrategy.class);
- protected TokenMetadata tokenMetadata_;
+ private TokenMetadata tokenMetadata_;
protected final IEndpointSnitch snitch_;
AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch
snitch)
@@ -194,8 +190,4 @@ public abstract class AbstractReplicatio
return getAddressRanges(temp, table).get(pendingAddress);
}
- public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver
responseResolver, ConsistencyLevel consistencyLevel, String table)
- {
- return new QuorumResponseHandler(responseResolver, consistencyLevel,
table);
- }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
Mon May 10 18:53:49 2010
@@ -150,36 +150,37 @@ public class DatacenterShardStrategy ext
}
}
- private ArrayList<InetAddress> getNaturalEndpointsInternal(Token
searchToken, TokenMetadata metadata, String table) throws UnknownHostException
+ private ArrayList<InetAddress> getNaturalEndpointsInternal(Token
searchToken, TokenMetadata metadata) throws IOException
{
ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
if (metadata.sortedTokens().size() == 0)
return endpoints;
- if (tokensize != metadata.sortedTokens().size())
+ if (null == tokens || tokens.size() != metadata.sortedTokens().size())
{
- loadEndPoints(metadata);
+ loadEndpoints(metadata);
}
- for (String dc : dcTokens.keySet())
+ for (String dc : dcMap.keySet())
{
- int replicas_ = getReplicationFactor(dc, table);
- List<Token> tokens = dcTokens.get(dc);
+ int replicas_ = dcReplicationFactor.get(dc);
+ ArrayList<InetAddress> forloopReturn = new
ArrayList<InetAddress>(replicas_);
+ List<Token> tokens = dcMap.get(dc);
boolean bOtherRack = false;
boolean doneDataCenterItr;
// Add the node at the index by default
Iterator<Token> iter = TokenMetadata.ringIterator(tokens,
searchToken);
InetAddress primaryHost = metadata.getEndpoint(iter.next());
- endpoints.add(primaryHost);
+ forloopReturn.add(primaryHost);
- while (endpoints.size() < replicas_ && iter.hasNext())
+ while (forloopReturn.size() < replicas_ && iter.hasNext())
{
Token t = iter.next();
- InetAddress endPointOfInterest = metadata.getEndpoint(t);
- if (endpoints.size() < replicas_ - 1)
+ InetAddress endpointOfInterest = metadata.getEndpoint(t);
+ if (forloopReturn.size() < replicas_ - 1)
{
- endpoints.add(endPointOfInterest);
+ forloopReturn.add(endpointOfInterest);
continue;
}
else
@@ -190,9 +191,10 @@ public class DatacenterShardStrategy ext
// Now try to find one on a different rack
if (!bOtherRack)
{
- if
(!snitch.getRack(primaryHost).equals(snitch.getRack(endPointOfInterest)))
+ AbstractRackAwareSnitch snitch =
(AbstractRackAwareSnitch)snitch_;
+ if
(!snitch.getRack(primaryHost).equals(snitch.getRack(endpointOfInterest)))
{
- endpoints.add(metadata.getEndpoint(t));
+ forloopReturn.add(metadata.getEndpoint(t));
bOtherRack = true;
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
Mon May 10 18:53:49 2010
@@ -19,6 +19,8 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
/**
* A simple endpoint snitch implementation that assumes datacenter and rack
information is encoded
@@ -26,12 +28,12 @@ import java.net.InetAddress;
*/
public class RackInferringSnitch extends AbstractRackAwareSnitch
{
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddress endpoint) throws UnknownHostException
{
return Byte.toString(endpoint.getAddress()[2]);
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddress endpoint) throws
UnknownHostException
{
return Byte.toString(endpoint.getAddress()[1]);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Mon May 10 18:53:49 2010
@@ -112,7 +112,7 @@ class ConsistencyChecker implements Runn
if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
- IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_);
+ IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
IAsyncCallback responseHandler;
if (replicas_.contains(FBUtilities.getLocalAddress()))
responseHandler = new DataRepairHandler(row_,
replicas_.size(), readResponseResolver);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
Mon May 10 18:53:49 2010
@@ -24,19 +24,13 @@ package org.apache.cassandra.service;
*/
-import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
-
-import com.google.common.collect.Multimap;
/**
* This class will block for the replication factor which is
@@ -45,85 +39,58 @@ import com.google.common.collect.Multima
*/
public class DatacenterSyncWriteResponseHandler extends WriteResponseHandler
{
- private final DatacenterShardStrategy stategy =
(DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
- private HashMap<String, AtomicInteger> dcResponses;
+ private final Map<String, Integer> dcResponses = new HashMap<String,
Integer>();
+ private final Map<String, Integer> responseCounts;
+ private final AbstractRackAwareSnitch endpointSnitch;
- public DatacenterSyncWriteResponseHandler(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints,
ConsistencyLevel consistencyLevel, String table)
- throws UnavailableException
+ public DatacenterSyncWriteResponseHandler(Map<String, Integer>
responseCounts, String table)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+ super(1, table);
+ this.responseCounts = responseCounts;
+ endpointSnitch = (AbstractRackAwareSnitch)
DatabaseDescriptor.getEndpointSnitch();
}
+ @Override
// synchronized for the benefit of dcResponses and responseCounts.
"responses" itself
// is inherited from WRH and is concurrent.
- @Override
- public void response(Message message)
+ // TODO can we use concurrent structures instead?
+ public synchronized void response(Message message)
{
- responses.add(message);
try
{
- String dataCenter =
endpointsnitch.getDatacenter(message.getFrom());
+ String dataCenter =
endpointSnitch.getDatacenter(message.getFrom());
+ Object blockFor = responseCounts.get(dataCenter);
// If this DC needs to be blocked then do the below.
- dcResponses.get(dataCenter).getAndDecrement();
+ if (blockFor != null)
+ {
+ Integer quorumCount = dcResponses.get(dataCenter);
+ if (quorumCount == null)
+ {
+ // Intialize and recognize the first response
+ dcResponses.put(dataCenter, 1);
+ }
+ else if ((Integer) blockFor > quorumCount)
+ {
+ // recognize the consequtive responses.
+ dcResponses.put(dataCenter, quorumCount + 1);
+ }
+ else
+ {
+ // No need to wait on it anymore so remove it.
+ responseCounts.remove(dataCenter);
+ }
+ }
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
- maybeSignal();
- }
-
- private void maybeSignal()
- {
- for(AtomicInteger i : dcResponses.values()) {
- if (0 < i.get()) {
- return;
- }
- }
- // If all the quorum conditionas are met then return back.
- condition.signal();
- }
-
- @Override
- public int determineBlockFor(Collection<InetAddress> writeEndpoints)
- {
- this.dcResponses = new HashMap<String, AtomicInteger>();
- for (String dc: stategy.getDatacenters(table)) {
- int rf = stategy.getReplicationFactor(dc, table);
- dcResponses.put(dc, new AtomicInteger((rf/2) + 1));
- }
- // Do nothing, there is no 'one' integer to block for
- return 0;
- }
-
- @Override
- public void assureSufficientLiveNodes(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints) throws
UnavailableException
- {
- Map<String, AtomicInteger> dcEndpoints = new HashMap<String,
AtomicInteger>();
- try
- {
- for (String dc: stategy.getDatacenters(table))
- dcEndpoints.put(dc, new AtomicInteger());
- for (InetAddress destination : hintedEndpoints.keySet())
- {
- // If not just go to the next endpoint
- if (!writeEndpoints.contains(destination))
- continue;
- // figure out the destination dc
- String destinationDC =
endpointsnitch.getDatacenter(destination);
-
dcEndpoints.get(destinationDC).incrementAndGet();
- }
- }
- catch (UnknownHostException e)
- {
- throw new UnavailableException();
- }
- // Throw exception if any of the DC doesnt have livenodes to accept
write.
- for (String dc: stategy.getDatacenters(table)) {
- if (dcEndpoints.get(dc).get() != dcResponses.get(dc).get()) {
- throw new UnavailableException();
- }
+ responses.add(message);
+ // If done then the response count will be empty
+ if (responseCounts.isEmpty())
+ {
+ condition.signal();
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
Mon May 10 18:53:49 2010
@@ -26,19 +26,13 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.DatacenterShardStrategy;
-import org.apache.cassandra.locator.RackInferringSnitch;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
-import com.google.common.collect.Multimap;
-
/**
* This class will basically will block for the replication factor which is
* provided in the input map. it will block till we recive response from (DC,
n)
@@ -46,26 +40,27 @@ import com.google.common.collect.Multima
*/
public class DatacenterWriteResponseHandler extends WriteResponseHandler
{
- private static final RackInferringSnitch snitch = (RackInferringSnitch)
DatabaseDescriptor.getEndpointSnitch();
- private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
- private final AtomicInteger blockFor;
+ private final AtomicInteger blockFor;
+ private final AbstractRackAwareSnitch endpointsnitch;
+ private final InetAddress localEndpoint;
- public DatacenterWriteResponseHandler(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints,
ConsistencyLevel consistencyLevel, String table) throws UnavailableException
+ public DatacenterWriteResponseHandler(int blockFor, String table)
{
// Response is been managed by the map so the waitlist size really
doesnt matter.
- super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
- blockFor = new AtomicInteger(responseCount);
+ super(blockFor, table);
+ this.blockFor = new AtomicInteger(blockFor);
+ endpointsnitch = (AbstractRackAwareSnitch)
DatabaseDescriptor.getEndpointSnitch();
+ localEndpoint = FBUtilities.getLocalAddress();
}
@Override
public void response(Message message)
{
- responses.add(message);
//Is optimal to check if same datacenter than comparing Arrays.
int b = -1;
try
{
- if
(localdc.equals(endpointsnitch.getDatacenter(message.getFrom())))
+ if
(endpointsnitch.getDatacenter(localEndpoint).equals(endpointsnitch.getDatacenter(message.getFrom())))
{
b = blockFor.decrementAndGet();
}
@@ -74,6 +69,7 @@ public class DatacenterWriteResponseHand
{
throw new RuntimeException(e);
}
+ responses.add(message);
if (b == 0)
{
//Singnal when Quorum is recived.
@@ -82,35 +78,4 @@ public class DatacenterWriteResponseHand
if (logger.isDebugEnabled())
logger.debug("Processed Message: " + message.toString());
}
-
- @Override
- public void assureSufficientLiveNodes(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
- throws UnavailableException
- {
- int liveNodes = 0;
- try {
- // count destinations that are part of the desired target set
- for (InetAddress destination : hintedEndpoints.keySet())
- {
- if (writeEndpoints.contains(destination) &&
localdc.equals(endpointsnitch.getDatacenter(destination)))
- liveNodes++;
- }
- } catch (Exception ex) {
- throw new UnavailableException();
- }
- if (liveNodes < responseCount)
- {
- throw new UnavailableException();
- }
- }
-
- @Override
- public int determineBlockFor(Collection<InetAddress> writeEndpoints)
- {
- DatacenterShardStrategy stategy = (DatacenterShardStrategy)
StorageService.instance.getReplicationStrategy(table);
- if (consistencyLevel.equals(ConsistencyLevel.DCQUORUM)) {
- return (stategy.getReplicationFactor(localdc, table)/2) + 1;
- }
- return super.determineBlockFor(writeEndpoints);
- }
-}
\ No newline at end of file
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Mon May 10 18:53:49 2010
@@ -19,6 +19,9 @@
package org.apache.cassandra.service;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -28,7 +31,6 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.SimpleCondition;
import org.slf4j.Logger;
@@ -38,19 +40,15 @@ public class QuorumResponseHandler<T> im
{
protected static final Logger logger = LoggerFactory.getLogger(
QuorumResponseHandler.class );
protected final SimpleCondition condition = new SimpleCondition();
- protected final Collection<Message> responses = new
LinkedBlockingQueue<Message>();;
- protected IResponseResolver<T> responseResolver;
+ protected final Collection<Message> responses;
+ private IResponseResolver<T> responseResolver;
private final long startTime;
- protected int blockfor;
-
- /**
- * Constructor when response count has to be calculated and blocked for.
- */
- public QuorumResponseHandler(IResponseResolver<T> responseResolver,
ConsistencyLevel consistencyLevel, String table)
+
+ public QuorumResponseHandler(int responseCount, IResponseResolver<T>
responseResolver)
{
- this.blockfor = determineBlockFor(consistencyLevel, table);
+ responses = new LinkedBlockingQueue<Message>();
this.responseResolver = responseResolver;
- this.startTime = System.currentTimeMillis();
+ startTime = System.currentTimeMillis();
}
public T get() throws TimeoutException, DigestMismatchException,
IOException
@@ -92,28 +90,9 @@ public class QuorumResponseHandler<T> im
public void response(Message message)
{
responses.add(message);
- if (responses.size() < blockfor) {
- return;
- }
if (responseResolver.isDataPresent(responses))
{
condition.signal();
}
}
-
- public int determineBlockFor(ConsistencyLevel consistencyLevel, String
table)
- {
- switch (consistencyLevel)
- {
- case ONE:
- case ANY:
- return 1;
- case QUORUM:
- return (DatabaseDescriptor.getQuorum(table)/ 2) + 1;
- case ALL:
- return DatabaseDescriptor.getReplicationFactor(table);
- default:
- throw new UnsupportedOperationException("invalid consistency
level: " + table.toString());
- }
- }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Mon May 10 18:53:49 2010
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +43,17 @@ public class ReadResponseResolver implem
{
private static Logger logger_ =
LoggerFactory.getLogger(ReadResponseResolver.class);
private final String table;
+ private final int responseCount;
- public ReadResponseResolver(String table)
+ public ReadResponseResolver(String table, int responseCount)
{
+ assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor(table)
+ : "invalid response count " + responseCount;
+
+ this.responseCount = responseCount;
this.table = table;
}
-
+
/*
* This method for resolving read data should look at the timestamps of
each
* of the columns that are read and should pick up columns with the latest
@@ -153,6 +159,9 @@ public class ReadResponseResolver implem
public boolean isDataPresent(Collection<Message> responses)
{
+ if (responses.size() < responseCount)
+ return false;
+
boolean isDataPresent = false;
for (Message response : responses)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon
May 10 18:53:49 2010
@@ -213,9 +213,13 @@ public class StorageProxy implements Sto
List<InetAddress> naturalEndpoints =
ss.getNaturalEndpoints(table, rm.key());
Collection<InetAddress> writeEndpoints =
rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table,
naturalEndpoints);
Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
+ int blockFor = determineBlockFor(writeEndpoints.size(),
consistency_level);
+
+ // avoid starting a write we know can't achieve the required
consistency
+ assureSufficientLiveNodes(blockFor, writeEndpoints,
hintedEndpoints, consistency_level);
// send out the writes, as in mutate() above, but this time
with a callback that tracks responses
- final WriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level,
table);
+ final WriteResponseHandler responseHandler =
ss.getWriteResponseHandler(blockFor, consistency_level, table);
responseHandlers.add(responseHandler);
Message unhintedMessage = null;
for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
@@ -283,6 +287,29 @@ public class StorageProxy implements Sto
}
+ private static void assureSufficientLiveNodes(int blockFor,
Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress>
hintedEndpoints, ConsistencyLevel consistencyLevel)
+ throws UnavailableException
+ {
+ if (consistencyLevel == ConsistencyLevel.ANY)
+ {
+ // ensure there are blockFor distinct living nodes (hints are ok).
+ if (hintedEndpoints.keySet().size() < blockFor)
+ throw new UnavailableException();
+ }
+
+ // count destinations that are part of the desired target set
+ int liveNodes = 0;
+ for (InetAddress destination : hintedEndpoints.keySet())
+ {
+ if (writeEndpoints.contains(destination))
+ liveNodes++;
+ }
+ if (liveNodes < blockFor)
+ {
+ throw new UnavailableException();
+ }
+ }
+
private static void insertLocalMessage(final RowMutation rm, final
WriteResponseHandler responseHandler)
{
if (logger.isDebugEnabled())
@@ -298,6 +325,26 @@ public class StorageProxy implements Sto
StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
}
+ private static int determineBlockFor(int expandedTargets, ConsistencyLevel
consistency_level)
+ {
+ switch (consistency_level)
+ {
+ case ONE:
+ case ANY:
+ return 1;
+ case QUORUM:
+ return (expandedTargets / 2) + 1;
+ case DCQUORUM:
+ case DCQUORUMSYNC:
+ // TODO this is broken
+ return expandedTargets;
+ case ALL:
+ return expandedTargets;
+ default:
+ throw new UnsupportedOperationException("invalid consistency
level " + consistency_level);
+ }
+ }
+
/**
* Read the data from one replica. When we get
* the data we perform consistency checks and figure out if any repairs
need to be done to the replicas.
@@ -414,6 +461,10 @@ public class StorageProxy implements Sto
InetAddress dataPoint =
StorageService.instance.findSuitableEndpoint(command.table, command.key);
List<InetAddress> endpointList =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+ final String table = command.table;
+ int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table),
consistency_level);
+ if (endpointList.size() < responseCount)
+ throw new UnavailableException();
InetAddress[] endpoints = new InetAddress[endpointList.size()];
Message messages[] = new Message[endpointList.size()];
@@ -428,9 +479,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
- AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(command.table);
- QuorumResponseHandler<Row> quorumResponseHandler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
-
consistency_level, command.table);
+ QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new
ReadResponseResolver(command.table, responseCount));
MessagingService.instance.sendRR(messages, endpoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndpoints.add(endpoints);
@@ -454,10 +503,10 @@ public class StorageProxy implements Sto
{
if (randomlyReadRepair(command))
{
- IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table);
- AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(command.table);
- QuorumResponseHandler<Row> quorumResponseHandlerRepair =
rs.getQuorumResponseHandler(readResponseResolverRepair,
ConsistencyLevel.QUORUM,
-
command.table);
+ IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table,
DatabaseDescriptor.getQuorum(command.table));
+ QuorumResponseHandler<Row> quorumResponseHandlerRepair =
new QuorumResponseHandler<Row>(
+ DatabaseDescriptor.getQuorum(command.table),
+ readResponseResolverRepair);
logger.info("DigestMismatchException: " + ex.getMessage());
Message messageRepair = command.makeReadMessage();
MessagingService.instance.sendRR(messageRepair,
commandEndpoints.get(commandIndex), quorumResponseHandlerRepair);
@@ -517,7 +566,9 @@ public class StorageProxy implements Sto
long startTime = System.nanoTime();
final String table = command.keyspace;
- List<Pair<AbstractBounds, List<InetAddress>>> ranges =
getRestrictedRanges(command.range, command.keyspace);
+ int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table),
consistency_level);
+
+ List<Pair<AbstractBounds, List<InetAddress>>> ranges =
getRestrictedRanges(command.range, command.keyspace, responseCount);
// now scan until we have enough results
List<Row> rows = new ArrayList<Row>(command.max_keys);
@@ -530,8 +581,8 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, endpoints);
- AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(table);
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level, table);
+ QuorumResponseHandler<List<Row>> handler = new
QuorumResponseHandler<List<Row>>(responseCount, resolver);
+
for (InetAddress endpoint : endpoints)
{
MessagingService.instance.sendRR(message, endpoint, handler);
@@ -627,7 +678,7 @@ public class StorageProxy implements Sto
* D, but we don't want any other results from it until after the (D,
T] range. Unwrapping so that
* the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
*/
- private static List<Pair<AbstractBounds, List<InetAddress>>>
getRestrictedRanges(AbstractBounds queryRange, String keyspace)
+ private static List<Pair<AbstractBounds, List<InetAddress>>>
getRestrictedRanges(AbstractBounds queryRange, String keyspace, int
responseCount)
throws UnavailableException
{
TokenMetadata tokenMetadata =
StorageService.instance.getTokenMetadata();
@@ -638,8 +689,11 @@ public class StorageProxy implements Sto
Token nodeToken = iter.next();
Range nodeRange = new
Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
+ if (endpoints.size() < responseCount)
+ throw new UnavailableException();
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
+ List<InetAddress> endpointsForCL = endpoints.subList(0,
responseCount);
Set<AbstractBounds> restrictedRanges =
queryRange.restrictTo(nodeRange);
for (AbstractBounds range : restrictedRanges)
{
@@ -647,7 +701,7 @@ public class StorageProxy implements Sto
{
if (logger.isDebugEnabled())
logger.debug("Adding to restricted ranges " +
unwrapped + " for " + nodeRange);
- ranges.add(new Pair<AbstractBounds,
List<InetAddress>>(unwrapped, endpoints));
+ ranges.add(new Pair<AbstractBounds,
List<InetAddress>>(unwrapped, endpointsForCL));
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon May 10 18:53:49 2010
@@ -50,6 +50,7 @@ import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -1520,6 +1521,11 @@ public class StorageService implements I
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter +
token.toString()));
}
+ public WriteResponseHandler getWriteResponseHandler(int blockFor,
ConsistencyLevel consistency_level, String table)
+ {
+ return getReplicationStrategy(table).getWriteResponseHandler(blockFor,
consistency_level, table);
+ }
+
public boolean isClientMode()
{
return isClientMode;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Mon May 10 18:53:49 2010
@@ -19,45 +19,41 @@
package org.apache.cassandra.service;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.net.InetAddress;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.AbstractRackAwareSnitch;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Multimap;
-
public class WriteResponseHandler implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger(
WriteResponseHandler.class );
- protected static final AbstractRackAwareSnitch endpointsnitch =
(AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
protected final SimpleCondition condition = new SimpleCondition();
- protected final int responseCount;
+ private final int responseCount;
protected final Collection<Message> responses;
protected AtomicInteger localResponses = new AtomicInteger(0);
- protected final long startTime;
- protected final ConsistencyLevel consistencyLevel;
- protected final String table;
+ private final long startTime;
- public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel
consistencyLevel, String table)
- throws UnavailableException
+ public WriteResponseHandler(int responseCount, String table)
{
- this.table = table;
- this.consistencyLevel = consistencyLevel;
- this.responseCount = determineBlockFor(writeEndpoints);
- assureSufficientLiveNodes(writeEndpoints, hintedEndpoints);
+ // at most one node per range can bootstrap at a time, and these will
be added to the write until
+ // bootstrap finishes (at which point we no longer need to write to
the old ones).
+ assert 1 <= responseCount && responseCount <= 2 *
DatabaseDescriptor.getReplicationFactor(table)
+ : "invalid response count " + responseCount;
+
+ this.responseCount = responseCount;
responses = new LinkedBlockingQueue<Message>();
startTime = System.currentTimeMillis();
}
@@ -110,54 +106,4 @@ public class WriteResponseHandler implem
condition.signal();
}
}
-
- public int determineBlockFor(Collection<InetAddress> writeEndpoints)
- {
- int blockFor = 0;
- switch (consistencyLevel)
- {
- case ONE:
- blockFor = 1;
- break;
- case ANY:
- blockFor = 1;
- break;
- case QUORUM:
- blockFor = (writeEndpoints.size() / 2) + 1;
- break;
- case ALL:
- blockFor = writeEndpoints.size();
- break;
- default:
- throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel.toString());
- }
- // at most one node per range can bootstrap at a time, and these will
be added to the write until
- // bootstrap finishes (at which point we no longer need to write to
the old ones).
- assert 1 <= blockFor && blockFor <= 2 *
DatabaseDescriptor.getReplicationFactor(table)
- : "invalid response count " + responseCount;
- return blockFor;
- }
-
- public void assureSufficientLiveNodes(Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
- throws UnavailableException
- {
- if (consistencyLevel == ConsistencyLevel.ANY)
- {
- // ensure there are blockFor distinct living nodes (hints are ok).
- if (hintedEndpoints.keySet().size() < responseCount)
- throw new UnavailableException();
- }
-
- // count destinations that are part of the desired target set
- int liveNodes = 0;
- for (InetAddress destination : hintedEndpoints.keySet())
- {
- if (writeEndpoints.contains(destination))
- liveNodes++;
- }
- if (liveNodes < responseCount)
- {
- throw new UnavailableException();
- }
- }
-}
\ No newline at end of file
+}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
Mon May 10 18:53:49 2010
@@ -1,31 +1,18 @@
package org.apache.cassandra.locator;
-import java.io.IOException;
-import java.net.InetAddress;
-
-import javax.xml.parsers.ParserConfigurationException;
-
import org.junit.Test;
-import org.xml.sax.SAXException;
+
+import org.apache.cassandra.config.ConfigurationException;
public class DatacenterStrategyTest
{
- private String table = "Keyspace1";
-
- @Test
- public void testProperties() throws IOException,
ParserConfigurationException, SAXException
+ @Test
+ public void testProperties() throws ConfigurationException
{
- XMLFileSnitch snitch = new XMLFileSnitch();
- TokenMetadata metadata = new TokenMetadata();
- InetAddress localhost = InetAddress.getLocalHost();
- // Set the localhost to the tokenmetadata. Embeded cassandra way?
- // metadata.addBootstrapToken();
- DatacenterShardStrategy strategy = new DatacenterShardStrategy(new
TokenMetadata(), snitch);
- assert strategy.getReplicationFactor("dc1", table) == 3;
- assert strategy.getReplicationFactor("dc2", table) == 5;
- assert strategy.getReplicationFactor("dc3", table) == 1;
- // Query for the natural hosts
- // strategy.getNaturalEndpoints(token, table)
+ DatacenterShardStrategy strategy = new DatacenterShardStrategy(new
TokenMetadata(), new RackInferringSnitch());
+ assert strategy.getReplicationFactor("dc1") == 3;
+ assert strategy.getReplicationFactor("dc2") == 5;
+ assert strategy.getReplicationFactor("dc3") == 1;
}
}