Updated Branches: refs/heads/trunk b7e49b3ab -> 8e003d842
use max(current time from system clock, inProgress + 1) as CAS ballot patch by jbellis; reviewed by slebresne for CASSANDRA-5667 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e003d84 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e003d84 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e003d84 Branch: refs/heads/trunk Commit: 8e003d842619bfce3585761684e7ba4114be89db Parents: b7e49b3 Author: Jonathan Ellis <[email protected]> Authored: Sun Jun 30 23:25:09 2013 -0700 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jul 4 10:02:05 2013 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../apache/cassandra/cql/QueryProcessor.java | 2 +- .../org/apache/cassandra/db/SystemKeyspace.java | 10 +- .../apache/cassandra/service/StorageProxy.java | 122 +++++++++---------- .../cassandra/service/paxos/PaxosState.java | 2 +- .../org/apache/cassandra/utils/UUIDGen.java | 11 ++ 6 files changed, 79 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 281a0aa..74f1753 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,7 +9,7 @@ * Removed compatibility with pre-1.2.5 sstables and network messages (CASSANDRA-5511) * removed PBSPredictor (CASSANDRA-5455) - * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619) + * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619, 5667) * Leveled compaction performs size-tiered compactions in L0 (CASSANDRA-5371, 5439) * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/cql/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java index e68aa7f..8e63021 100644 --- a/src/java/org/apache/cassandra/cql/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java @@ -72,7 +72,7 @@ public class QueryProcessor public static final String DEFAULT_KEY_NAME = CFMetaData.DEFAULT_KEY_ALIAS.toUpperCase(); private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now) - throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException, WriteTimeoutException + throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException { List<ReadCommand> commands = new ArrayList<ReadCommand>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index d518468..e686f16 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -797,15 +797,17 @@ public class SystemKeyspace return Math.max(3 * 3600, metadata.getGcGraceSeconds()); } - public static void savePaxosCommit(Commit commit, boolean eraseInProgressProposal) + public static void savePaxosCommit(Commit commit, UUID inProgressBallot) { - String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; + String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; // identical except adds proposal = null - String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; - processInternal(String.format(eraseInProgressProposal ? eraseCql : preserveCql, + String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; + boolean proposalAfterCommit = inProgressBallot.timestamp() > commit.ballot.timestamp(); + processInternal(String.format(proposalAfterCommit ? preserveCql : eraseCql, PAXOS_CF, UUIDGen.microsTimestamp(commit.ballot), paxosTtl(commit.update.metadata), + proposalAfterCommit ? inProgressBallot : commit.ballot, commit.ballot, ByteBufferUtil.bytesToHex(commit.update.toBytes()), ByteBufferUtil.bytesToHex(commit.key), http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 944b011..763e86a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -43,8 +43,6 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.ColumnNameBuilder; import org.apache.cassandra.db.*; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.marshal.UUIDType; @@ -219,9 +217,7 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; - UUID ballot = beginAndRepairPaxos(key, metadata, liveEndpoints, requiredParticipants); - if (ballot == null) - continue; + UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants); // read the current value and compare with expected Tracing.trace("Reading existing values for CAS precondition"); @@ -323,63 +319,64 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static UUID beginAndRepairPaxos(ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants) + private static UUID beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants) throws WriteTimeoutException { - UUID ballot = UUIDGen.getTimeUUID(); + long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - // prepare - Tracing.trace("Preparing {}", ballot); - Commit toPrepare = Commit.newPrepare(key, metadata, ballot); - PrepareCallback summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants); - if (!summary.promised) + PrepareCallback summary = null; + while (start - System.nanoTime() < timeout) { - Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); - // sleep a random amount to give the other proposer a chance to finish - Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100), TimeUnit.MILLISECONDS); - return null; - } + long ballotMillis = summary == null + ? System.currentTimeMillis() + : Math.max(System.currentTimeMillis(), 1 + UUIDGen.unixTimestamp(summary.inProgressCommit.ballot)); + UUID ballot = UUIDGen.getTimeUUID(ballotMillis); - Commit inProgress = summary.inProgressCommit; - Commit mostRecent = summary.mostRecentCommit; + // prepare + Tracing.trace("Preparing {}", ballot); + Commit toPrepare = Commit.newPrepare(key, metadata, ballot); + summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants); + if (!summary.promised) + { + Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); + // sleep a random amount to give the other proposer a chance to finish + Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100), TimeUnit.MILLISECONDS); + continue; + } - // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that - // needs to be completed, so do it. - if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) - { - Tracing.trace("Finishing incomplete paxos round {}", inProgress); - if (proposePaxos(inProgress, liveEndpoints, requiredParticipants)) + Commit inProgress = summary.inProgressCommit; + Commit mostRecent = summary.mostRecentCommit; + + // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that + // needs to be completed, so do it. + if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) { - try - { + Tracing.trace("Finishing incomplete paxos round {}", inProgress); + if (proposePaxos(inProgress, liveEndpoints, requiredParticipants)) commitPaxos(inProgress, ConsistencyLevel.QUORUM); - } - catch (WriteTimeoutException e) - { - // let caller retry or turn it into a cas timeout, since it's someone elses' write we're applying - return null; - } + continue; + } + + // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at: + // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) + // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also + // mean we lost messages), we pro-actively "repair" those nodes, and retry. + Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(); + if (Iterables.size(missingMRC) > 0) + { + Tracing.trace("Repairing replicas that missed the most recent commit"); + sendCommit(mostRecent, missingMRC); + // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait + // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means + // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that + // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'. + continue; } - return null; - } - // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at: - // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) - // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also - // mean we lost messages), we pro-actively "repair" those nodes, and retry. - Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(); - if (Iterables.size(missingMRC) > 0) - { - Tracing.trace("Repairing replicas that missed the most recent commit"); - sendCommit(mostRecent, missingMRC); - // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait - // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means - // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that - // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'. - return null; + return ballot; } - return ballot; + throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1); } /** @@ -1069,7 +1066,7 @@ public class StorageProxy implements StorageProxyMBean * a specific set of column names from a given column family. */ public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException, WriteTimeoutException + throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException { if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands)) { @@ -1087,22 +1084,21 @@ public class StorageProxy implements StorageProxyMBean // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read if (commands.size() > 1) throw new InvalidRequestException("SERIAL consistency may only be requested for one row at a time"); - ReadCommand command = commands.get(0); + CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; - long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - while (true) + // does the work of applying in-progress writes; throws UAE or timeout if it can't + try { - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; - - if (beginAndRepairPaxos(command.key, metadata, liveEndpoints, requiredParticipants) != null) - break; - - if (System.nanoTime() - start >= timeout) - throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1); + beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants); + } + catch (WriteTimeoutException e) + { + throw new ReadTimeoutException(ConsistencyLevel.SERIAL, -1, -1, false); } rows = fetchRows(commands, ConsistencyLevel.QUORUM); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index ff0035e..ca69c16 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -116,7 +116,7 @@ public class PaxosState synchronized (lockFor(proposal.key)) { PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); - SystemKeyspace.savePaxosCommit(proposal, !state.inProgressCommit.isAfter(proposal)); + SystemKeyspace.savePaxosCommit(proposal, state.inProgressCommit.ballot); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index bb3afd8..f385744 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -70,6 +70,16 @@ public class UUIDGen return new UUID(instance.createTimeSafe(), clockSeqAndNode); } + /** + * Creates a type 1 UUID (time-based UUID) with the timestamp of @param when, in milliseconds. + * + * @return a UUID instance + */ + public static UUID getTimeUUID(long when) + { + return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode); + } + /** creates a type 1 uuid from raw bytes. */ public static UUID getUUID(ByteBuffer raw) { @@ -246,6 +256,7 @@ public class UUIDGen return createTime(nanosSince); } + /** @param when time in milliseconds */ private long createTimeUnsafe(long when) { return createTimeUnsafe(when, 0);
