Updated Branches: refs/heads/trunk 2f3f620ed -> 122775166
add read support for CL.SERIAL patch by jbellis; reviewed by slebresne for CASSANDRA-5441 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/12277516 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/12277516 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/12277516 Branch: refs/heads/trunk Commit: 1227751661e653fb39be1ada369a9eda8f1e9d7f Parents: 2f3f620 Author: Jonathan Ellis <[email protected]> Authored: Tue Apr 30 12:42:26 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Apr 30 12:42:26 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- interface/cassandra.thrift | 1 + .../apache/cassandra/thrift/ConsistencyLevel.java | 5 +- .../org/apache/cassandra/cql/QueryProcessor.java | 2 +- .../org/apache/cassandra/db/ConsistencyLevel.java | 10 +- .../org/apache/cassandra/service/StorageProxy.java | 162 ++++++++++----- .../org/apache/cassandra/service/paxos/Commit.java | 2 +- .../apache/cassandra/thrift/ThriftConversion.java | 17 +-- test/system/test_thrift_server.py | 3 + 9 files changed, 127 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f2f3c0..5e9edfd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,7 +2,7 @@ * Removed compatibility with pre-1.2.5 sstables and network messages (CASSANDRA-5511) * removed PBSPredictor (CASSANDRA-5455) - * CAS support (CASSANDRA-5062, ) + * CAS support (CASSANDRA-5062, 5441, 5443) * Leveled compaction performs size-tiered compactions in L0 (CASSANDRA-5371, 5439) * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index a57603c..b057fa0 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -228,6 +228,7 @@ enum ConsistencyLevel { ANY = 6, TWO = 7, THREE = 8, + SERIAL = 9, } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java index 8386797..96b6308 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java @@ -82,7 +82,8 @@ public enum ConsistencyLevel implements org.apache.thrift.TEnum { ALL(5), ANY(6), TWO(7), - THREE(8); + THREE(8), + SERIAL(9); private final int value; @@ -119,6 +120,8 @@ public enum ConsistencyLevel implements org.apache.thrift.TEnum { return TWO; case 8: return THREE; + case 9: + return SERIAL; default: return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/cql/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java index f896f24..7eaa111 100644 --- a/src/java/org/apache/cassandra/cql/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java @@ -74,7 +74,7 @@ public class QueryProcessor public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME); private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables) - throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException + throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException, WriteTimeoutException { List<ReadCommand> commands = new ArrayList<ReadCommand>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 9f4fc49..aec3c2d 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -258,11 +258,11 @@ public enum ConsistencyLevel { switch (this) { - case ANY: - throw new InvalidRequestException("ANY ConsistencyLevel is only supported for writes"); case LOCAL_QUORUM: requireNetworkTopologyStrategy(table); break; + case ANY: + throw new InvalidRequestException("ANY ConsistencyLevel is only supported for writes"); case EACH_QUORUM: throw new InvalidRequestException("EACH_QUORUM ConsistencyLevel is only supported for writes"); } @@ -276,6 +276,8 @@ public enum ConsistencyLevel case EACH_QUORUM: requireNetworkTopologyStrategy(table); break; + case SERIAL: + throw new InvalidRequestException("You must use conditional updates for serializable writes"); } } @@ -289,6 +291,10 @@ public enum ConsistencyLevel { throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + metadata.cfName); } + else if (this == ConsistencyLevel.SERIAL) + { + throw new InvalidRequestException("Counter operations are inherently non-serializable"); + } } private void requireNetworkTopologyStrategy(String table) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 6e0c9e4..8bfbcd5 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -197,66 +197,21 @@ public class StorageProxy implements StorageProxyMBean * @return true if the operation succeeds in updating the row */ public static boolean cas(String table, String cfName, ByteBuffer key, ColumnFamily expected, ColumnFamily updates) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException + throws UnavailableException, IOException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName); long timedOut = System.currentTimeMillis() + DatabaseDescriptor.getCasContentionTimeout(); while (System.currentTimeMillis() < timedOut) { - // begin a paxos round - UUID ballot = UUIDGen.getTimeUUID(); - Token tk = StorageService.getPartitioner().getToken(key); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table); - int requiredParticipants = pendingEndpoints.size() + 1 + naturalEndpoints.size() / 2; // See CASSANDRA-833 - // for simplicity, we'll do a single liveness check at the start. the gains from repeating this check - // are not large enough to bother with. - List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); - if (liveEndpoints.size() < requiredParticipants) - throw new UnavailableException(ConsistencyLevel.SERIAL, requiredParticipants, liveEndpoints.size()); - - // prepare - logger.debug("Preparing {}", ballot); - Commit toPrepare = Commit.newPrepare(key, metadata, ballot); - PrepareCallback summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants); - if (!summary.promised) - { - logger.debug("Some replicas have already promised a higher ballot than ours; aborting"); - // sleep a random amount to give the other proposer a chance to finish - FBUtilities.sleep(FBUtilities.threadLocalRandom().nextInt(100)); - continue; - } - - Commit inProgress = summary.inProgressCommit; - Commit mostRecent = summary.mostRecentCommit; + // for simplicity, we'll do a single liveness check at the start of each attempt + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(table, key); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; - // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that - // needs to be completed, so do it. - if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) - { - logger.debug("Finishing incomplete paxos round {}", inProgress); - if (proposePaxos(inProgress, liveEndpoints, requiredParticipants)) - commitPaxos(inProgress, liveEndpoints); - // no need to sleep here + UUID ballot = beginAndRepairPaxos(key, metadata, liveEndpoints, requiredParticipants); + if (ballot == null) continue; - } - - // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at: - // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) - // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also - // mean we lost messages), we pro-actively "repair" those nodes, and retry. - Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(); - if (Iterables.size(missingMRC) > 0) - { - logger.debug("Repairing replicas that missed the most recent commit"); - commitPaxos(mostRecent, missingMRC); - // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait - // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means - // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that - // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'. - continue; - } // read the current value and compare with expected logger.debug("Reading existing values for CAS precondition"); @@ -273,7 +228,7 @@ public class StorageProxy implements StorageProxyMBean // finish the paxos round w/ the desired updates // TODO turn null updates into delete? - Commit proposal = toPrepare.makeProposal(updates); + Commit proposal = Commit.newProposal(key, ballot, updates); logger.debug("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants)) { @@ -323,6 +278,74 @@ public class StorageProxy implements StorageProxyMBean return true; } + private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String table, ByteBuffer key) throws UnavailableException + { + Token tk = StorageService.getPartitioner().getToken(key); + List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table); + int requiredParticipants = pendingEndpoints.size() + 1 + naturalEndpoints.size() / 2; // See CASSANDRA-833 + List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); + if (liveEndpoints.size() < requiredParticipants) + throw new UnavailableException(ConsistencyLevel.SERIAL, requiredParticipants, liveEndpoints.size()); + return Pair.create(liveEndpoints, requiredParticipants); + } + + /** + * begin a Paxos session by sending a prepare request and completing any in-progress requests seen in the replies + * + * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of + * nodes have seen the mostRecentCommit. Otherwise, return null. + */ + private static UUID beginAndRepairPaxos(ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants) + throws WriteTimeoutException, UnavailableException + { + UUID ballot = UUIDGen.getTimeUUID(); + + // prepare + logger.debug("Preparing {}", ballot); + Commit toPrepare = Commit.newPrepare(key, metadata, ballot); + PrepareCallback summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants); + if (!summary.promised) + { + logger.debug("Some replicas have already promised a higher ballot than ours; aborting"); + // sleep a random amount to give the other proposer a chance to finish + FBUtilities.sleep(FBUtilities.threadLocalRandom().nextInt(100)); + return null; + } + + Commit inProgress = summary.inProgressCommit; + Commit mostRecent = summary.mostRecentCommit; + + // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that + // needs to be completed, so do it. + if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) + { + logger.debug("Finishing incomplete paxos round {}", inProgress); + if (proposePaxos(inProgress, liveEndpoints, requiredParticipants)) + commitPaxos(inProgress, liveEndpoints); + // no need to sleep here + return null; + } + + // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at: + // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) + // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also + // mean we lost messages), we pro-actively "repair" those nodes, and retry. + Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(); + if (Iterables.size(missingMRC) > 0) + { + logger.debug("Repairing replicas that missed the most recent commit"); + commitPaxos(mostRecent, missingMRC); + // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait + // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means + // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that + // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'. + return null; + } + + return ballot; + } + private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants) throws WriteTimeoutException, UnavailableException { @@ -1002,7 +1025,7 @@ public class StorageProxy implements StorageProxyMBean * a specific set of column names from a given column family. */ public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException + throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException, WriteTimeoutException { if (StorageService.instance.isBootstrapMode() && !systemTableQuery(commands)) { @@ -1010,11 +1033,40 @@ public class StorageProxy implements StorageProxyMBean ClientRequestMetrics.readUnavailables.inc(); throw new IsBootstrappingException(); } + long startTime = System.nanoTime(); List<Row> rows = null; try { - rows = fetchRows(commands, consistency_level); + if (consistency_level == ConsistencyLevel.SERIAL) + { + // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read + if (commands.size() > 1) + throw new InvalidRequestException("SERIAL consistency may only be requested for one row at a time"); + + ReadCommand command = commands.get(0); + CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName); + + long timedOut = System.currentTimeMillis() + DatabaseDescriptor.getCasContentionTimeout(); + while (true) + { + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.table, command.key); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; + + if (beginAndRepairPaxos(command.key, metadata, liveEndpoints, requiredParticipants) != null) + break; + + if (System.currentTimeMillis() >= timedOut) + throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1); + } + + rows = fetchRows(commands, ConsistencyLevel.QUORUM); + } + else + { + rows = fetchRows(commands, consistency_level); + } } catch (UnavailableException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 82a43e6..28d83ef 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -42,7 +42,7 @@ public class Commit return new Commit(key, ballot, EmptyColumns.factory.create(metadata)); } - public Commit makeProposal(ColumnFamily update) + public static Commit newProposal(ByteBuffer key, UUID ballot, ColumnFamily update) { return new Commit(key, ballot, updatesWithPaxosTime(update, ballot)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/thrift/ThriftConversion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java index fe28743..0642129 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java +++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java @@ -28,22 +28,6 @@ import org.apache.cassandra.exceptions.WriteTimeoutException; */ public class ThriftConversion { - public static ConsistencyLevel toThrift(org.apache.cassandra.db.ConsistencyLevel cl) - { - switch (cl) - { - case ANY: return ConsistencyLevel.ANY; - case ONE: return ConsistencyLevel.ONE; - case TWO: return ConsistencyLevel.TWO; - case THREE: return ConsistencyLevel.THREE; - case QUORUM: return ConsistencyLevel.QUORUM; - case ALL: return ConsistencyLevel.ALL; - case LOCAL_QUORUM: return ConsistencyLevel.LOCAL_QUORUM; - case EACH_QUORUM: return ConsistencyLevel.EACH_QUORUM; - } - throw new AssertionError(); - } - public static org.apache.cassandra.db.ConsistencyLevel fromThrift(ConsistencyLevel cl) { switch (cl) @@ -56,6 +40,7 @@ public class ThriftConversion case ALL: return org.apache.cassandra.db.ConsistencyLevel.ALL; case LOCAL_QUORUM: return org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM; case EACH_QUORUM: return org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM; + case SERIAL: return org.apache.cassandra.db.ConsistencyLevel.SERIAL; } throw new AssertionError(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/test/system/test_thrift_server.py ---------------------------------------------------------------------- diff --git a/test/system/test_thrift_server.py b/test/system/test_thrift_server.py index 8fecd29..151ca48 100644 --- a/test/system/test_thrift_server.py +++ b/test/system/test_thrift_server.py @@ -242,6 +242,9 @@ class TestMutations(ThriftTester): assert not client.cas('key1', 'Standard1', None, _SIMPLE_COLUMNS) + # CL.SERIAL for reads + assert client.get('key1', ColumnPath('Standard1', column='c1'), ConsistencyLevel.SERIAL).column.value == 'value1' + def test_missing_super(self): _set_keyspace('Keyspace1') _expect_missing(lambda: client.get('key1', ColumnPath('Super1', 'sc1', _i64(1)), ConsistencyLevel.ONE))
