Updated Branches:
  refs/heads/trunk b7e49b3ab -> 8e003d842

use max(current time from system clock, inProgress + 1) as CAS ballot
patch by jbellis; reviewed by slebresne for CASSANDRA-5667


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e003d84
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e003d84
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e003d84

Branch: refs/heads/trunk
Commit: 8e003d842619bfce3585761684e7ba4114be89db
Parents: b7e49b3
Author: Jonathan Ellis <[email protected]>
Authored: Sun Jun 30 23:25:09 2013 -0700
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Jul 4 10:02:05 2013 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/cql/QueryProcessor.java    |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  10 +-
 .../apache/cassandra/service/StorageProxy.java  | 122 +++++++++----------
 .../cassandra/service/paxos/PaxosState.java     |   2 +-
 .../org/apache/cassandra/utils/UUIDGen.java     |  11 ++
 6 files changed, 79 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 281a0aa..74f1753 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@
  * Removed compatibility with pre-1.2.5 sstables and network messages
    (CASSANDRA-5511)
  * removed PBSPredictor (CASSANDRA-5455)
- * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619)
+ * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619, 5667)
  * Leveled compaction performs size-tiered compactions in L0 
    (CASSANDRA-5371, 5439)
  * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index e68aa7f..8e63021 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -72,7 +72,7 @@ public class QueryProcessor
     public static final String DEFAULT_KEY_NAME = 
CFMetaData.DEFAULT_KEY_ALIAS.toUpperCase();
 
     private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData 
metadata, SelectStatement select, List<ByteBuffer> variables, long now)
-    throws InvalidRequestException, ReadTimeoutException, 
UnavailableException, IsBootstrappingException, WriteTimeoutException
+    throws InvalidRequestException, ReadTimeoutException, 
UnavailableException, IsBootstrappingException
     {
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index d518468..e686f16 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -797,15 +797,17 @@ public class SystemKeyspace
         return Math.max(3 * 3600, metadata.getGcGraceSeconds());
     }
 
-    public static void savePaxosCommit(Commit commit, boolean 
eraseInProgressProposal)
+    public static void savePaxosCommit(Commit commit, UUID inProgressBallot)
     {
-        String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET 
most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND 
cf_id = %s";
+        String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET 
in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s 
WHERE row_key = 0x%s AND cf_id = %s";
         // identical except adds proposal = null
-        String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET 
proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE 
row_key = 0x%s AND cf_id = %s";
-        processInternal(String.format(eraseInProgressProposal ? eraseCql : 
preserveCql,
+        String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET 
proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, 
most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
+        boolean proposalAfterCommit = inProgressBallot.timestamp() > 
commit.ballot.timestamp();
+        processInternal(String.format(proposalAfterCommit ? preserveCql : 
eraseCql,
                                       PAXOS_CF,
                                       UUIDGen.microsTimestamp(commit.ballot),
                                       paxosTtl(commit.update.metadata),
+                                      proposalAfterCommit ? inProgressBallot : 
commit.ballot,
                                       commit.ballot,
                                       
ByteBufferUtil.bytesToHex(commit.update.toBytes()),
                                       ByteBufferUtil.bytesToHex(commit.key),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 944b011..763e86a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -43,8 +43,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -219,9 +217,7 @@ public class StorageProxy implements StorageProxyMBean
             List<InetAddress> liveEndpoints = p.left;
             int requiredParticipants = p.right;
 
-            UUID ballot = beginAndRepairPaxos(key, metadata, liveEndpoints, 
requiredParticipants);
-            if (ballot == null)
-                continue;
+            UUID ballot = beginAndRepairPaxos(start, key, metadata, 
liveEndpoints, requiredParticipants);
 
             // read the current value and compare with expected
             Tracing.trace("Reading existing values for CAS precondition");
@@ -323,63 +319,64 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static UUID beginAndRepairPaxos(ByteBuffer key, CFMetaData 
metadata, List<InetAddress> liveEndpoints, int requiredParticipants)
+    private static UUID beginAndRepairPaxos(long start, ByteBuffer key, 
CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants)
     throws WriteTimeoutException
     {
-        UUID ballot = UUIDGen.getTimeUUID();
+        long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 
-        // prepare
-        Tracing.trace("Preparing {}", ballot);
-        Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-        PrepareCallback summary = preparePaxos(toPrepare, liveEndpoints, 
requiredParticipants);
-        if (!summary.promised)
+        PrepareCallback summary = null;
+        while (start - System.nanoTime() < timeout)
         {
-            Tracing.trace("Some replicas have already promised a higher ballot 
than ours; aborting");
-            // sleep a random amount to give the other proposer a chance to 
finish
-            
Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100),
 TimeUnit.MILLISECONDS);
-            return null;
-        }
+            long ballotMillis = summary == null
+                              ? System.currentTimeMillis()
+                              : Math.max(System.currentTimeMillis(), 1 + 
UUIDGen.unixTimestamp(summary.inProgressCommit.ballot));
+            UUID ballot = UUIDGen.getTimeUUID(ballotMillis);
 
-        Commit inProgress = summary.inProgressCommit;
-        Commit mostRecent = summary.mostRecentCommit;
+            // prepare
+            Tracing.trace("Preparing {}", ballot);
+            Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
+            summary = preparePaxos(toPrepare, liveEndpoints, 
requiredParticipants);
+            if (!summary.promised)
+            {
+                Tracing.trace("Some replicas have already promised a higher 
ballot than ours; aborting");
+                // sleep a random amount to give the other proposer a chance 
to finish
+                
Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100),
 TimeUnit.MILLISECONDS);
+                continue;
+            }
 
-        // If we have an in-progress ballot greater than the MRC we know, then 
it's an in-progress round that
-        // needs to be completed, so do it.
-        if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
-        {
-            Tracing.trace("Finishing incomplete paxos round {}", inProgress);
-            if (proposePaxos(inProgress, liveEndpoints, requiredParticipants))
+            Commit inProgress = summary.inProgressCommit;
+            Commit mostRecent = summary.mostRecentCommit;
+
+            // If we have an in-progress ballot greater than the MRC we know, 
then it's an in-progress round that
+            // needs to be completed, so do it.
+            if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
             {
-                try
-                {
+                Tracing.trace("Finishing incomplete paxos round {}", 
inProgress);
+                if (proposePaxos(inProgress, liveEndpoints, 
requiredParticipants))
                     commitPaxos(inProgress, ConsistencyLevel.QUORUM);
-                }
-                catch (WriteTimeoutException e)
-                {
-                    // let caller retry or turn it into a cas timeout, since 
it's someone elses' write we're applying
-                    return null;
-                }
+                continue;
+            }
+
+            // To be able to propose our value on a new round, we need a 
quorum of replica to have learn the previous one. Why is explained at:
+            // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
+            // Since we waited for quorum nodes, if some of them haven't seen 
the last commit (which may just be a timing issue, but may also
+            // mean we lost messages), we pro-actively "repair" those nodes, 
and retry.
+            Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit();
+            if (Iterables.size(missingMRC) > 0)
+            {
+                Tracing.trace("Repairing replicas that missed the most recent 
commit");
+                sendCommit(mostRecent, missingMRC);
+                // TODO: provided commits don't invalid the prepare we just 
did above (which they don't), we could just wait
+                // for all the missingMRC to acknowledge this commit and then 
move on with proposing our value. But that means
+                // adding the ability to have commitPaxos block, which is 
exactly CASSANDRA-5442 will do. So once we have that
+                // latter ticket, we can pass CL.ALL to the commit above and 
remove the 'continue'.
+                continue;
             }
-            return null;
-        }
 
-        // To be able to propose our value on a new round, we need a quorum of 
replica to have learn the previous one. Why is explained at:
-        // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
-        // Since we waited for quorum nodes, if some of them haven't seen the 
last commit (which may just be a timing issue, but may also
-        // mean we lost messages), we pro-actively "repair" those nodes, and 
retry.
-        Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit();
-        if (Iterables.size(missingMRC) > 0)
-        {
-            Tracing.trace("Repairing replicas that missed the most recent 
commit");
-            sendCommit(mostRecent, missingMRC);
-            // TODO: provided commits don't invalid the prepare we just did 
above (which they don't), we could just wait
-            // for all the missingMRC to acknowledge this commit and then move 
on with proposing our value. But that means
-            // adding the ability to have commitPaxos block, which is exactly 
CASSANDRA-5442 will do. So once we have that
-            // latter ticket, we can pass CL.ALL to the commit above and 
remove the 'continue'.
-            return null;
+            return ballot;
         }
 
-        return ballot;
+        throw new WriteTimeoutException(WriteType.CAS, 
ConsistencyLevel.SERIAL, -1, -1);
     }
 
     /**
@@ -1069,7 +1066,7 @@ public class StorageProxy implements StorageProxyMBean
      * a specific set of column names from a given column family.
      */
     public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel 
consistency_level)
-    throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, InvalidRequestException, WriteTimeoutException
+    throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, InvalidRequestException
     {
         if (StorageService.instance.isBootstrapMode() && 
!systemKeyspaceQuery(commands))
         {
@@ -1087,22 +1084,21 @@ public class StorageProxy implements StorageProxyMBean
                 // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
                 if (commands.size() > 1)
                     throw new InvalidRequestException("SERIAL consistency may 
only be requested for one row at a time");
-
                 ReadCommand command = commands.get(0);
+
                 CFMetaData metadata = 
Schema.instance.getCFMetaData(command.ksName, command.cfName);
+                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.ksName, command.key);
+                List<InetAddress> liveEndpoints = p.left;
+                int requiredParticipants = p.right;
 
-                long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
-                while (true)
+                // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
+                try
                 {
-                    Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.ksName, command.key);
-                    List<InetAddress> liveEndpoints = p.left;
-                    int requiredParticipants = p.right;
-
-                    if (beginAndRepairPaxos(command.key, metadata, 
liveEndpoints, requiredParticipants) != null)
-                        break;
-
-                    if (System.nanoTime() - start >= timeout)
-                        throw new WriteTimeoutException(WriteType.CAS, 
ConsistencyLevel.SERIAL, -1, -1);
+                    beginAndRepairPaxos(start, command.key, metadata, 
liveEndpoints, requiredParticipants);
+                }
+                catch (WriteTimeoutException e)
+                {
+                    throw new ReadTimeoutException(ConsistencyLevel.SERIAL, 
-1, -1, false);
                 }
 
                 rows = fetchRows(commands, ConsistencyLevel.QUORUM);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ff0035e..ca69c16 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -116,7 +116,7 @@ public class PaxosState
         synchronized (lockFor(proposal.key))
         {
             PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, 
proposal.update.metadata());
-            SystemKeyspace.savePaxosCommit(proposal, 
!state.inProgressCommit.isAfter(proposal));
+            SystemKeyspace.savePaxosCommit(proposal, 
state.inProgressCommit.ballot);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e003d84/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java 
b/src/java/org/apache/cassandra/utils/UUIDGen.java
index bb3afd8..f385744 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -70,6 +70,16 @@ public class UUIDGen
         return new UUID(instance.createTimeSafe(), clockSeqAndNode);
     }
 
+    /**
+     * Creates a type 1 UUID (time-based UUID) with the timestamp of @param 
when, in milliseconds.
+     *
+     * @return a UUID instance
+     */
+    public static UUID getTimeUUID(long when)
+    {
+        return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode);
+    }
+
     /** creates a type 1 uuid from raw bytes. */
     public static UUID getUUID(ByteBuffer raw)
     {
@@ -246,6 +256,7 @@ public class UUIDGen
         return createTime(nanosSince);
     }
 
+    /** @param when time in milliseconds */
     private long createTimeUnsafe(long when)
     {
         return createTimeUnsafe(when, 0);

Reply via email to