Updated Branches:
  refs/heads/trunk 2f3f620ed -> 122775166

add read support for CL.SERIAL
patch by jbellis; reviewed by slebresne for CASSANDRA-5441


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/12277516
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/12277516
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/12277516

Branch: refs/heads/trunk
Commit: 1227751661e653fb39be1ada369a9eda8f1e9d7f
Parents: 2f3f620
Author: Jonathan Ellis <[email protected]>
Authored: Tue Apr 30 12:42:26 2013 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Tue Apr 30 12:42:26 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 interface/cassandra.thrift                         |    1 +
 .../apache/cassandra/thrift/ConsistencyLevel.java  |    5 +-
 .../org/apache/cassandra/cql/QueryProcessor.java   |    2 +-
 .../org/apache/cassandra/db/ConsistencyLevel.java  |   10 +-
 .../org/apache/cassandra/service/StorageProxy.java |  162 ++++++++++-----
 .../org/apache/cassandra/service/paxos/Commit.java |    2 +-
 .../apache/cassandra/thrift/ThriftConversion.java  |   17 +--
 test/system/test_thrift_server.py                  |    3 +
 9 files changed, 127 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f2f3c0..5e9edfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,7 +2,7 @@
  * Removed compatibility with pre-1.2.5 sstables and network messages
    (CASSANDRA-5511)
  * removed PBSPredictor (CASSANDRA-5455)
- * CAS support (CASSANDRA-5062, )
+ * CAS support (CASSANDRA-5062, 5441, 5443)
  * Leveled compaction performs size-tiered compactions in L0 
    (CASSANDRA-5371, 5439)
  * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index a57603c..b057fa0 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -228,6 +228,7 @@ enum ConsistencyLevel {
     ANY = 6,
     TWO = 7,
     THREE = 8,
+    SERIAL = 9,
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java
index 8386797..96b6308 100644
--- 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java
+++ 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java
@@ -82,7 +82,8 @@ public enum ConsistencyLevel implements 
org.apache.thrift.TEnum {
   ALL(5),
   ANY(6),
   TWO(7),
-  THREE(8);
+  THREE(8),
+  SERIAL(9);
 
   private final int value;
 
@@ -119,6 +120,8 @@ public enum ConsistencyLevel implements 
org.apache.thrift.TEnum {
         return TWO;
       case 8:
         return THREE;
+      case 9:
+        return SERIAL;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index f896f24..7eaa111 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -74,7 +74,7 @@ public class QueryProcessor
     public static final String DEFAULT_KEY_NAME = 
bufferToString(CFMetaData.DEFAULT_KEY_NAME);
 
     private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData 
metadata, SelectStatement select, List<ByteBuffer> variables)
-    throws InvalidRequestException, ReadTimeoutException, 
UnavailableException, IsBootstrappingException
+    throws InvalidRequestException, ReadTimeoutException, 
UnavailableException, IsBootstrappingException, WriteTimeoutException
     {
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 9f4fc49..aec3c2d 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -258,11 +258,11 @@ public enum ConsistencyLevel
     {
         switch (this)
         {
-            case ANY:
-                throw new InvalidRequestException("ANY ConsistencyLevel is 
only supported for writes");
             case LOCAL_QUORUM:
                 requireNetworkTopologyStrategy(table);
                 break;
+            case ANY:
+                throw new InvalidRequestException("ANY ConsistencyLevel is 
only supported for writes");
             case EACH_QUORUM:
                 throw new InvalidRequestException("EACH_QUORUM 
ConsistencyLevel is only supported for writes");
         }
@@ -276,6 +276,8 @@ public enum ConsistencyLevel
             case EACH_QUORUM:
                 requireNetworkTopologyStrategy(table);
                 break;
+            case SERIAL:
+                throw new InvalidRequestException("You must use conditional 
updates for serializable writes");
         }
     }
 
@@ -289,6 +291,10 @@ public enum ConsistencyLevel
         {
             throw new InvalidRequestException("cannot achieve CL > CL.ONE 
without replicate_on_write on columnfamily " + metadata.cfName);
         }
+        else if (this == ConsistencyLevel.SERIAL)
+        {
+            throw new InvalidRequestException("Counter operations are 
inherently non-serializable");
+        }
     }
 
     private void requireNetworkTopologyStrategy(String table) throws 
InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 6e0c9e4..8bfbcd5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -197,66 +197,21 @@ public class StorageProxy implements StorageProxyMBean
      * @return true if the operation succeeds in updating the row
      */
     public static boolean cas(String table, String cfName, ByteBuffer key, 
ColumnFamily expected, ColumnFamily updates)
-    throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException
+    throws UnavailableException, IOException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
 
         long timedOut = System.currentTimeMillis() + 
DatabaseDescriptor.getCasContentionTimeout();
         while (System.currentTimeMillis() < timedOut)
         {
-            // begin a paxos round
-            UUID ballot = UUIDGen.getTimeUUID();
-            Token tk = StorageService.getPartitioner().getToken(key);
-            List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(table, tk);
-            Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
-            int requiredParticipants = pendingEndpoints.size() + 1 + 
naturalEndpoints.size() / 2; // See CASSANDRA-833
-            // for simplicity, we'll do a single liveness check at the start.  
the gains from repeating this check
-            // are not large enough to bother with.
-            List<InetAddress> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
-            if (liveEndpoints.size() < requiredParticipants)
-                throw new UnavailableException(ConsistencyLevel.SERIAL, 
requiredParticipants, liveEndpoints.size());
-
-            // prepare
-            logger.debug("Preparing {}", ballot);
-            Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            PrepareCallback summary = preparePaxos(toPrepare, liveEndpoints, 
requiredParticipants);
-            if (!summary.promised)
-            {
-                logger.debug("Some replicas have already promised a higher 
ballot than ours; aborting");
-                // sleep a random amount to give the other proposer a chance 
to finish
-                
FBUtilities.sleep(FBUtilities.threadLocalRandom().nextInt(100));
-                continue;
-            }
-
-            Commit inProgress = summary.inProgressCommit;
-            Commit mostRecent = summary.mostRecentCommit;
+            // for simplicity, we'll do a single liveness check at the start 
of each attempt
+            Pair<List<InetAddress>, Integer> p = getPaxosParticipants(table, 
key);
+            List<InetAddress> liveEndpoints = p.left;
+            int requiredParticipants = p.right;
 
-            // If we have an in-progress ballot greater than the MRC we know, 
then it's an in-progress round that
-            // needs to be completed, so do it.
-            if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
-            {
-                logger.debug("Finishing incomplete paxos round {}", 
inProgress);
-                if (proposePaxos(inProgress, liveEndpoints, 
requiredParticipants))
-                    commitPaxos(inProgress, liveEndpoints);
-                // no need to sleep here
+            UUID ballot = beginAndRepairPaxos(key, metadata, liveEndpoints, 
requiredParticipants);
+            if (ballot == null)
                 continue;
-            }
-
-            // To be able to propose our value on a new round, we need a 
quorum of replica to have learn the previous one. Why is explained at:
-            // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
-            // Since we waited for quorum nodes, if some of them haven't seen 
the last commit (which may just be a timing issue, but may also
-            // mean we lost messages), we pro-actively "repair" those nodes, 
and retry.
-            Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit();
-            if (Iterables.size(missingMRC) > 0)
-            {
-                logger.debug("Repairing replicas that missed the most recent 
commit");
-                commitPaxos(mostRecent, missingMRC);
-                // TODO: provided commits don't invalid the prepare we just 
did above (which they don't), we could just wait
-                // for all the missingMRC to acknowledge this commit and then 
move on with proposing our value. But that means
-                // adding the ability to have commitPaxos block, which is 
exactly CASSANDRA-5442 will do. So once we have that
-                // latter ticket, we can pass CL.ALL to the commit above and 
remove the 'continue'.
-                continue;
-            }
 
             // read the current value and compare with expected
             logger.debug("Reading existing values for CAS precondition");
@@ -273,7 +228,7 @@ public class StorageProxy implements StorageProxyMBean
 
             // finish the paxos round w/ the desired updates
             // TODO turn null updates into delete?
-            Commit proposal = toPrepare.makeProposal(updates);
+            Commit proposal = Commit.newProposal(key, ballot, updates);
             logger.debug("CAS precondition is met; proposing client-requested 
updates for {}", ballot);
             if (proposePaxos(proposal, liveEndpoints, requiredParticipants))
             {
@@ -323,6 +278,74 @@ public class StorageProxy implements StorageProxyMBean
         return true;
     }
 
+    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(String table, ByteBuffer key) throws UnavailableException
+    {
+        Token tk = StorageService.getPartitioner().getToken(key);
+        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(table, tk);
+        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+        int requiredParticipants = pendingEndpoints.size() + 1 + 
naturalEndpoints.size() / 2; // See CASSANDRA-833
+        List<InetAddress> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
+        if (liveEndpoints.size() < requiredParticipants)
+            throw new UnavailableException(ConsistencyLevel.SERIAL, 
requiredParticipants, liveEndpoints.size());
+        return Pair.create(liveEndpoints, requiredParticipants);
+    }
+
+    /**
+     * begin a Paxos session by sending a prepare request and completing any 
in-progress requests seen in the replies
+     *
+     * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
+     * nodes have seen the mostRecentCommit.  Otherwise, return null.
+     */
+    private static UUID beginAndRepairPaxos(ByteBuffer key, CFMetaData 
metadata, List<InetAddress> liveEndpoints, int requiredParticipants)
+    throws WriteTimeoutException, UnavailableException
+    {
+        UUID ballot = UUIDGen.getTimeUUID();
+
+        // prepare
+        logger.debug("Preparing {}", ballot);
+        Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
+        PrepareCallback summary = preparePaxos(toPrepare, liveEndpoints, 
requiredParticipants);
+        if (!summary.promised)
+        {
+            logger.debug("Some replicas have already promised a higher ballot 
than ours; aborting");
+            // sleep a random amount to give the other proposer a chance to 
finish
+            FBUtilities.sleep(FBUtilities.threadLocalRandom().nextInt(100));
+            return null;
+        }
+
+        Commit inProgress = summary.inProgressCommit;
+        Commit mostRecent = summary.mostRecentCommit;
+
+        // If we have an in-progress ballot greater than the MRC we know, then 
it's an in-progress round that
+        // needs to be completed, so do it.
+        if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
+        {
+            logger.debug("Finishing incomplete paxos round {}", inProgress);
+            if (proposePaxos(inProgress, liveEndpoints, requiredParticipants))
+                commitPaxos(inProgress, liveEndpoints);
+            // no need to sleep here
+            return null;
+        }
+
+        // To be able to propose our value on a new round, we need a quorum of 
replica to have learn the previous one. Why is explained at:
+        // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
+        // Since we waited for quorum nodes, if some of them haven't seen the 
last commit (which may just be a timing issue, but may also
+        // mean we lost messages), we pro-actively "repair" those nodes, and 
retry.
+        Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit();
+        if (Iterables.size(missingMRC) > 0)
+        {
+            logger.debug("Repairing replicas that missed the most recent 
commit");
+            commitPaxos(mostRecent, missingMRC);
+            // TODO: provided commits don't invalid the prepare we just did 
above (which they don't), we could just wait
+            // for all the missingMRC to acknowledge this commit and then move 
on with proposing our value. But that means
+            // adding the ability to have commitPaxos block, which is exactly 
CASSANDRA-5442 will do. So once we have that
+            // latter ticket, we can pass CL.ALL to the commit above and 
remove the 'continue'.
+            return null;
+        }
+
+        return ballot;
+    }
+
     private static PrepareCallback preparePaxos(Commit toPrepare, 
List<InetAddress> endpoints, int requiredParticipants)
     throws WriteTimeoutException, UnavailableException
     {
@@ -1002,7 +1025,7 @@ public class StorageProxy implements StorageProxyMBean
      * a specific set of column names from a given column family.
      */
     public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel 
consistency_level)
-    throws UnavailableException, IsBootstrappingException, ReadTimeoutException
+    throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, InvalidRequestException, WriteTimeoutException
     {
         if (StorageService.instance.isBootstrapMode() && 
!systemTableQuery(commands))
         {
@@ -1010,11 +1033,40 @@ public class StorageProxy implements StorageProxyMBean
             ClientRequestMetrics.readUnavailables.inc();
             throw new IsBootstrappingException();
         }
+
         long startTime = System.nanoTime();
         List<Row> rows = null;
         try
         {
-            rows = fetchRows(commands, consistency_level);
+            if (consistency_level == ConsistencyLevel.SERIAL)
+            {
+                // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
+                if (commands.size() > 1)
+                    throw new InvalidRequestException("SERIAL consistency may 
only be requested for one row at a time");
+
+                ReadCommand command = commands.get(0);
+                CFMetaData metadata = 
Schema.instance.getCFMetaData(command.table, command.cfName);
+
+                long timedOut = System.currentTimeMillis() + 
DatabaseDescriptor.getCasContentionTimeout();
+                while (true)
+                {
+                    Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.table, command.key);
+                    List<InetAddress> liveEndpoints = p.left;
+                    int requiredParticipants = p.right;
+
+                    if (beginAndRepairPaxos(command.key, metadata, 
liveEndpoints, requiredParticipants) != null)
+                        break;
+
+                    if (System.currentTimeMillis() >= timedOut)
+                        throw new WriteTimeoutException(WriteType.CAS, 
ConsistencyLevel.SERIAL, -1, -1);
+                }
+
+                rows = fetchRows(commands, ConsistencyLevel.QUORUM);
+            }
+            else
+            {
+                rows = fetchRows(commands, consistency_level);
+            }
         }
         catch (UnavailableException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java 
b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 82a43e6..28d83ef 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -42,7 +42,7 @@ public class Commit
         return new Commit(key, ballot, EmptyColumns.factory.create(metadata));
     }
 
-    public Commit makeProposal(ColumnFamily update)
+    public static Commit newProposal(ByteBuffer key, UUID ballot, ColumnFamily 
update)
     {
         return new Commit(key, ballot, updatesWithPaxosTime(update, ballot));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java 
b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index fe28743..0642129 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -28,22 +28,6 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
  */
 public class ThriftConversion
 {
-    public static ConsistencyLevel 
toThrift(org.apache.cassandra.db.ConsistencyLevel cl)
-    {
-        switch (cl)
-        {
-            case ANY: return ConsistencyLevel.ANY;
-            case ONE: return ConsistencyLevel.ONE;
-            case TWO: return ConsistencyLevel.TWO;
-            case THREE: return ConsistencyLevel.THREE;
-            case QUORUM: return ConsistencyLevel.QUORUM;
-            case ALL: return ConsistencyLevel.ALL;
-            case LOCAL_QUORUM: return ConsistencyLevel.LOCAL_QUORUM;
-            case EACH_QUORUM: return ConsistencyLevel.EACH_QUORUM;
-        }
-        throw new AssertionError();
-    }
-
     public static org.apache.cassandra.db.ConsistencyLevel 
fromThrift(ConsistencyLevel cl)
     {
         switch (cl)
@@ -56,6 +40,7 @@ public class ThriftConversion
             case ALL: return org.apache.cassandra.db.ConsistencyLevel.ALL;
             case LOCAL_QUORUM: return 
org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM;
             case EACH_QUORUM: return 
org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+            case SERIAL: return 
org.apache.cassandra.db.ConsistencyLevel.SERIAL;
         }
         throw new AssertionError();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12277516/test/system/test_thrift_server.py
----------------------------------------------------------------------
diff --git a/test/system/test_thrift_server.py 
b/test/system/test_thrift_server.py
index 8fecd29..151ca48 100644
--- a/test/system/test_thrift_server.py
+++ b/test/system/test_thrift_server.py
@@ -242,6 +242,9 @@ class TestMutations(ThriftTester):
 
         assert not client.cas('key1', 'Standard1', None, _SIMPLE_COLUMNS)
 
+        # CL.SERIAL for reads
+        assert client.get('key1', ColumnPath('Standard1', column='c1'), 
ConsistencyLevel.SERIAL).column.value == 'value1'
+
     def test_missing_super(self):
         _set_keyspace('Keyspace1')
         _expect_missing(lambda: client.get('key1', ColumnPath('Super1', 'sc1', 
_i64(1)), ConsistencyLevel.ONE))

Reply via email to