Author: gdusbabek
Date: Fri Jan 22 16:04:10 2010
New Revision: 902140
URL: http://svn.apache.org/viewvc?rev=902140&view=rev
Log:
Add ConsistencyLevel.ANY for writes. Patch by Gary Dusbabek, reviewed by
Jonathan Ellis. CASSANDRA-687
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/interface/cassandra.thrift
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Fri Jan 22 16:04:10 2010
@@ -13,6 +13,7 @@
* row caching [must be explicitly enabled per-CF in config] (CASSANDRA-678)
* present a useful measure of compaction progress in JMX (CASSANDRA-599)
* add bin/sstablekeys (CASSNADRA-679)
+ * add ConsistencyLevel.ANY (CASSANDRA-687)
0.5.1
Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Fri Jan 22 16:04:10
2010
@@ -138,12 +138,14 @@
*
* Write:
* ZERO Ensure nothing. A write happens asynchronously in background
+ * ANY Ensure that the write has been written once somewhere,
including possibly being hinted in a non-target node.
* ONE Ensure that the write has been written to at least 1 node's
commit log and memory table before responding to the client.
* QUORUM Ensure that the write has been written to <ReplicationFactor>
/ 2 + 1 nodes before responding to the client.
* ALL Ensure that the write is written to
<code><ReplicationFactor></code> nodes before responding to the client.
*
* Read:
* ZERO Not supported, because it doesn't make sense.
+ * ANY Not supported. You probably want ONE instead.
* ONE Will return the record returned by the first node to respond.
A consistency check is always done in a
* background thread to fix any consistency issues when
ConsistencyLevel.ONE is used. This means subsequent
* calls will have correct data even if the initial read gets an
older value. (This is called 'read repair'.)
@@ -158,6 +160,7 @@
DCQUORUM = 3,
DCQUORUMSYNC = 4,
ALL = 5,
+ ANY = 6,
}
/**
@@ -275,7 +278,7 @@
ColumnOrSuperColumn get(1:required string keyspace,
2:required string key,
3:required ColumnPath column_path,
- 4:required ConsistencyLevel consistency_level=1)
+ 4:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire,
2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te),
/**
@@ -286,7 +289,7 @@
2:required string key,
3:required ColumnParent column_parent,
4:required SlicePredicate predicate,
- 5:required ConsistencyLevel
consistency_level=1)
+ 5:required ConsistencyLevel
consistency_level=ONE)
throws (1:InvalidRequestException ire,
2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -297,7 +300,7 @@
map<string,ColumnOrSuperColumn> multiget(1:required string keyspace,
2:required list<string> keys,
3:required ColumnPath column_path,
- 4:required ConsistencyLevel
consistency_level=1)
+ 4:required ConsistencyLevel
consistency_level=ONE)
throws (1:InvalidRequestException ire,
2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -307,7 +310,7 @@
2:required list<string>
keys,
3:required ColumnParent
column_parent,
4:required
SlicePredicate predicate,
- 5:required
ConsistencyLevel consistency_level=1)
+ 5:required
ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire,
2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -316,7 +319,7 @@
i32 get_count(1:required string keyspace,
2:required string key,
3:required ColumnParent column_parent,
- 4:required ConsistencyLevel consistency_level=1)
+ 4:required ConsistencyLevel consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
/**
@@ -328,7 +331,7 @@
4:required string start_key="",
5:required string finish_key="",
6:required i32 row_count=100,
- 7:required ConsistencyLevel
consistency_level=1)
+ 7:required ConsistencyLevel
consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException
ue, 3:TimedOutException te),
# modification methods
@@ -343,7 +346,7 @@
3:required ColumnPath column_path,
4:required binary value,
5:required i64 timestamp,
- 6:required ConsistencyLevel consistency_level=0)
+ 6:required ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
/**
@@ -355,7 +358,7 @@
void batch_insert(1:required string keyspace,
2:required string key,
3:required map<string, list<ColumnOrSuperColumn>> cfmap,
- 4:required ConsistencyLevel consistency_level=0)
+ 4:required ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
/**
@@ -367,12 +370,12 @@
2:required string key,
3:required ColumnPath column_path,
4:required i64 timestamp,
- 5:ConsistencyLevel consistency_level=0)
+ 5:ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
void batch_mutate(1:required string keyspace,
2:required map<string, map<string, list<Mutation>>>
mutation_map,
- 3:required ConsistencyLevel consistency_level=0)
+ 3:required ConsistencyLevel consistency_level=ZERO)
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
// Meta-APIs -- APIs to get information about the node or cluster,
Modified:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
---
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
(original)
+++
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
Fri Jan 22 16:04:10 2010
@@ -116,6 +116,7 @@
* Insert Columns or SuperColumns across different Column Families for the
same row key. batch_mutation is a
* map<string, list<ColumnOrSuperColumn>> -- a map which pairs column
family names with the relevant ColumnOrSuperColumn
* objects to insert.
+ * @deprecated; use batch_mutate instead
*
* @param keyspace
* @param key
Modified:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
---
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
(original)
+++
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
Fri Jan 22 16:04:10 2010
@@ -21,12 +21,14 @@
*
* Write:
* ZERO Ensure nothing. A write happens asynchronously in background
+ * ANY Ensure that the write has been written once somewhere,
including possibly being hinted in a non-target node.
* ONE Ensure that the write has been written to at least 1 node's
commit log and memory table before responding to the client.
* QUORUM Ensure that the write has been written to <ReplicationFactor>
/ 2 + 1 nodes before responding to the client.
* ALL Ensure that the write is written to
<code><ReplicationFactor></code> nodes before responding to the client.
*
* Read:
* ZERO Not supported, because it doesn't make sense.
+ * ANY Not supported. You probably want ONE instead.
* ONE Will return the record returned by the first node to respond.
A consistency check is always done in a
* background thread to fix any consistency issues when
ConsistencyLevel.ONE is used. This means subsequent
* calls will have correct data even if the initial read gets an
older value. (This is called 'read repair'.)
@@ -40,7 +42,8 @@
QUORUM(2),
DCQUORUM(3),
DCQUORUMSYNC(4),
- ALL(5);
+ ALL(5),
+ ANY(6);
private static final Map<Integer, ConsistencyLevel> BY_VALUE = new
HashMap<Integer,ConsistencyLevel>() {{
for(ConsistencyLevel val : ConsistencyLevel.values()) {
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
Fri Jan 22 16:04:10 2010
@@ -85,6 +85,10 @@
{
throw new InvalidRequestException("Consistency level all is not
yet supported on read operations");
}
+ if (consistency_level == ConsistencyLevel.ANY)
+ {
+ throw new InvalidRequestException("Consistency level any may not
be applied to read operations");
+ }
List<Row> rows;
try
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=902140&r1=902139&r2=902140&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Jan 22 16:04:10 2010
@@ -176,9 +176,9 @@
List<InetAddress> naturalEndpoints =
StorageService.instance.getNaturalEndpoints(rm.key());
Map<InetAddress, InetAddress> endpointMap =
StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
int blockFor = determineBlockFor(naturalEndpoints.size(),
endpointMap.size(), consistency_level);
-
+
// avoid starting a write we know can't achieve the required
consistency
- assureSufficientLiveNodes(endpointMap, blockFor);
+ assureSufficientLiveNodes(endpointMap, blockFor,
consistency_level);
// send out the writes, as in insert() above, but this time
with a callback that tracks responses
final WriteResponseHandler responseHandler =
StorageService.instance.getWriteResponseHandler(blockFor, consistency_level);
@@ -211,9 +211,11 @@
}
else
{
- // (hints aren't part of the callback since they don't
count towards consistency until they are on the final destination node)
Message hintedMessage = rm.makeRowMutationMessage();
hintedMessage.addHeader(RowMutation.HINT,
naturalTarget.getAddress());
+ // (hints are part of the callback and count towards
consistency only under CL.ANY
+ if (consistency_level == ConsistencyLevel.ANY)
+
MessagingService.instance.addCallback(responseHandler,
hintedMessage.getMessageId());
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + "
to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " +
naturalTarget);
MessagingService.instance.sendOneWay(hintedMessage,
maybeHintedTarget);
@@ -240,20 +242,30 @@
}
- private static void assureSufficientLiveNodes(Map<InetAddress,
InetAddress> endpointMap, int blockFor)
+ private static void assureSufficientLiveNodes(Map<InetAddress,
InetAddress> endpointMap, int blockFor, ConsistencyLevel consistencyLevel)
throws UnavailableException
{
- int liveNodes = 0;
- for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
+ if (consistencyLevel == ConsistencyLevel.ANY)
{
- if (entry.getKey().equals(entry.getValue()))
- {
- liveNodes++;
- }
+ // ensure there are blockFor distinct living nodes (hints are ok).
+ if (new HashSet(endpointMap.values()).size() < blockFor)
+ throw new UnavailableException();
}
- if (liveNodes < blockFor)
+ else
{
- throw new UnavailableException();
+ // only count live + unhinted nodes.
+ int liveNodes = 0;
+ for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
+ {
+ if (entry.getKey().equals(entry.getValue()))
+ {
+ liveNodes++;
+ }
+ }
+ if (liveNodes < blockFor)
+ {
+ throw new UnavailableException();
+ }
}
}
@@ -296,6 +308,10 @@
{
blockFor = naturalTargets + bootstrapTargets;
}
+ else if (consistency_level == ConsistencyLevel.ANY)
+ {
+ blockFor = 1;
+ }
else
{
throw new UnsupportedOperationException("invalid consistency level
" + consistency_level);