CAS should distinguish promised and accepted ballots patch by slebresne; reviewed by jbellis for CASSANDRA-6023
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bab28e4a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bab28e4a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bab28e4a Branch: refs/heads/trunk Commit: bab28e4ae4cf420f24433c24ea64177d78a7307b Parents: 6ec4eef Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Sep 16 08:43:46 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon Sep 16 08:43:46 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../org/apache/cassandra/config/CFMetaData.java | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 38 ++++++++++---------- .../cassandra/service/paxos/PaxosState.java | 35 +++++++++--------- .../service/paxos/PrepareCallback.java | 7 ++-- .../service/paxos/PrepareResponse.java | 6 ++++ 6 files changed, 47 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5517cee..2f01b7d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,7 +15,7 @@ * Require superuser status for adding triggers (CASSANDRA-5963) * Make standalone scrubber handle old and new style leveled manifest (CASSANDRA-6005) - * Fix paxos bugs (CASSANDRA-6012, 6013) + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023) Merged from 1.2: 1.2.10 * Fix possible divide-by-zero in HHOM (CASSANDRA-5990) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index be3da21..939163d 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -252,6 +252,7 @@ public final class CFMetaData + "row_key blob," + "cf_id UUID," + "in_progress_ballot timeuuid," + + "proposal_ballot timeuuid," + "proposal blob," + "most_recent_commit_at timeuuid," + "most_recent_commit blob," http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 7759114..3e608b3 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -782,14 +782,16 @@ public class SystemKeyspace if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); - Commit inProgress = new Commit(key, - row.getUUID("in_progress_ballot"), - row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata)); + Commit promised = new Commit(key, row.getUUID("in_progress_ballot"), EmptyColumns.factory.create(metadata)); + // either we have both a recently accepted ballot and update or we have neither + Commit accepted = row.has("proposal") + ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal"))) + : Commit.emptyCommit(key, metadata); // either most_recent_commit and most_recent_commit_at will both be set, or neither Commit mostRecent = row.has("most_recent_commit") ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit"))) : Commit.emptyCommit(key, metadata); - return new PaxosState(inProgress, mostRecent); + return new PaxosState(promised, accepted, mostRecent); } public static void savePaxosPromise(Commit promise) @@ -804,16 +806,16 @@ public class SystemKeyspace promise.update.id())); } - public static void savePaxosProposal(Commit commit) + public static void savePaxosProposal(Commit proposal) { - processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s", + processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s", PAXOS_CF, - UUIDGen.microsTimestamp(commit.ballot), - paxosTtl(commit.update.metadata), - commit.ballot, - ByteBufferUtil.bytesToHex(commit.update.toBytes()), - ByteBufferUtil.bytesToHex(commit.key), - commit.update.id())); + UUIDGen.microsTimestamp(proposal.ballot), + paxosTtl(proposal.update.metadata), + proposal.ballot, + ByteBufferUtil.bytesToHex(proposal.update.toBytes()), + ByteBufferUtil.bytesToHex(proposal.key), + proposal.update.id())); } private static int paxosTtl(CFMetaData metadata) @@ -822,17 +824,15 @@ public class SystemKeyspace return Math.max(3 * 3600, metadata.getGcGraceSeconds()); } - public static void savePaxosCommit(Commit commit, UUID inProgressBallot) + public static void savePaxosCommit(Commit commit) { - String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; - // identical except adds proposal = null - String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; - boolean proposalAfterCommit = inProgressBallot.timestamp() > commit.ballot.timestamp(); - processInternal(String.format(proposalAfterCommit ? preserveCql : eraseCql, + // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) + // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. + String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; + processInternal(String.format(cql, PAXOS_CF, UUIDGen.microsTimestamp(commit.ballot), paxosTtl(commit.update.metadata), - proposalAfterCommit ? inProgressBallot : commit.ballot, commit.ballot, ByteBufferUtil.bytesToHex(commit.update.toBytes()), ByteBufferUtil.bytesToHex(commit.key), http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index aa27628..ff0b02c 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -48,20 +48,22 @@ public class PaxosState return locks[(0x7FFFFFFF & key.hashCode()) % locks.length]; } - private final Commit inProgressCommit; + private final Commit promised; + private final Commit accepted; private final Commit mostRecentCommit; public PaxosState(ByteBuffer key, CFMetaData metadata) { - this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata)); + this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata)); } - public PaxosState(Commit inProgressCommit, Commit mostRecentCommit) + public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit) { - assert inProgressCommit.key == mostRecentCommit.key; - assert inProgressCommit.update.metadata() == inProgressCommit.update.metadata(); + assert promised.key == accepted.key && accepted.key == mostRecentCommit.key; + assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata(); - this.inProgressCommit = inProgressCommit; + this.promised = promised; + this.accepted = accepted; this.mostRecentCommit = mostRecentCommit; } @@ -70,17 +72,17 @@ public class PaxosState synchronized (lockFor(toPrepare.key)) { PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); - if (toPrepare.isAfter(state.inProgressCommit)) + if (toPrepare.isAfter(state.promised)) { Tracing.trace("Promising ballot {}", toPrepare.ballot); SystemKeyspace.savePaxosPromise(toPrepare); - // return the pre-promise ballot so coordinator can pick the most recent in-progress value to resume - return new PrepareResponse(true, state.inProgressCommit, state.mostRecentCommit); + return new PrepareResponse(true, state.accepted, state.mostRecentCommit); } else { - Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.inProgressCommit); - return new PrepareResponse(false, state.inProgressCommit, state.mostRecentCommit); + Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised); + // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667) + return new PrepareResponse(false, state.promised, state.mostRecentCommit); } } } @@ -90,7 +92,7 @@ public class PaxosState synchronized (lockFor(proposal.key)) { PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); - if (proposal.hasBallot(state.inProgressCommit.ballot) || proposal.isAfter(state.inProgressCommit)) + if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) { Tracing.trace("Accepting proposal {}", proposal); SystemKeyspace.savePaxosProposal(proposal); @@ -98,7 +100,7 @@ public class PaxosState } else { - Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.inProgressCommit); + Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised); return false; } } @@ -115,10 +117,7 @@ public class PaxosState RowMutation rm = proposal.makeMutation(); Keyspace.open(rm.getKeyspaceName()).apply(rm, true); - synchronized (lockFor(proposal.key)) - { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); - SystemKeyspace.savePaxosCommit(proposal, state.inProgressCommit.ballot); - } + // We don't need to lock, we're just blindly updating + SystemKeyspace.savePaxosCommit(proposal); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 9293254..04a18b9 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -23,8 +23,8 @@ package org.apache.cassandra.service.paxos; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -43,7 +43,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> public Commit mostRecentInProgressCommit; public Commit mostRecentInProgressCommitWithUpdate; - private Map<InetAddress, Commit> commitsByReplica = new HashMap<InetAddress, Commit>(); + private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>(); public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets) { @@ -73,6 +73,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> return; } + commitsByReplica.put(message.from, response.mostRecentCommit); if (response.mostRecentCommit.isAfter(mostRecentCommit)) mostRecentCommit = response.mostRecentCommit; @@ -81,8 +82,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty()) mostRecentInProgressCommitWithUpdate = response.inProgressCommit; - - latch.countDown(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java index 9f5fda6..d2bd835 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java @@ -38,6 +38,12 @@ public class PrepareResponse public static final PrepareResponseSerializer serializer = new PrepareResponseSerializer(); public final boolean promised; + + /* + * To maintain backward compatibility (see #6023), the meaning of inProgressCommit is a bit tricky. + * If promised is true, then that's the last accepted commit. If promise is false, that's just + * the previously promised ballot that made us refuse this one. + */ public final Commit inProgressCommit; public final Commit mostRecentCommit;