Updated Branches:
  refs/heads/trunk 5eb530853 -> ab13579a3

Improve CQL3 batchlog support

patch by slebresne; reviewed by jbellis for CASSANDRA-4738


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab13579a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab13579a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab13579a

Branch: refs/heads/trunk
Commit: ab13579a3cfde211437987a889b25f8ae6d94725
Parents: 5eb5308
Author: Sylvain Lebresne <[email protected]>
Authored: Tue Oct 2 18:37:35 2012 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Tue Oct 2 18:37:35 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 .../cassandra/cql3/statements/BatchStatement.java  |   28 -------------
 .../cassandra/cql3/statements/DeleteStatement.java |    2 +
 .../cql3/statements/ModificationStatement.java     |   31 ++++++++++++++-
 .../cassandra/cql3/statements/UpdateStatement.java |   21 +++++++---
 5 files changed, 47 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b33e40c..dd6061f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,7 +14,7 @@
  * (CQL3) Fix validation when using counter and regular columns in the same 
    table (CASSANDRA-4706)
  * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
- * Add support for batchlog in CQL3 (CASSANDRA-4545)
+ * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738)
  * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
  * Support repairing only the local DC nodes (CASSANDRA-4747)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 38df9bd..92c708b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -38,12 +38,6 @@ import org.apache.cassandra.utils.Pair;
  */
 public class BatchStatement extends ModificationStatement
 {
-    public static enum Type
-    {
-        LOGGED, UNLOGGED, COUNTER
-    }
-
-    protected final Type type;
     // statements to execute
     protected final List<ModificationStatement> statements;
 
@@ -85,28 +79,6 @@ public class BatchStatement extends ModificationStatement
     }
 
     @Override
-    public ResultMessage execute(ClientState state, List<ByteBuffer> 
variables) throws RequestExecutionException, RequestValidationException
-    {
-        Collection<? extends IMutation> mutations = getMutations(state, 
variables, false);
-        ConsistencyLevel cl = getConsistencyLevel();
-
-        switch (type)
-        {
-            case LOGGED:
-                StorageProxy.mutateAtomically((Collection<RowMutation>) 
mutations, cl);
-                break;
-            case UNLOGGED:
-            case COUNTER:
-                StorageProxy.mutate(mutations, cl);
-                break;
-            default:
-                throw new AssertionError();
-        }
-
-        return null;
-    }
-
-    @Override
     public ConsistencyLevel getConsistencyLevel()
     {
         // We have validated that either the consistency is set, or all 
statements have the same default CL (see validate())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index fc0efb8..86af858 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -195,6 +195,8 @@ public class DeleteStatement extends ModificationStatement
     public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) 
throws InvalidRequestException
     {
         CFMetaData metadata = 
ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+        type = metadata.getDefaultValidator().isCommutative() ? Type.COUNTER : 
Type.LOGGED;
+
         cfDef = metadata.getCfDef();
         UpdateStatement.processKeys(cfDef, whereClause, processedKeys, 
boundNames);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 192d837..df58ca9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -41,6 +41,13 @@ public abstract class ModificationStatement extends 
CFStatement implements CQLSt
 {
     public static final ConsistencyLevel defaultConsistency = 
ConsistencyLevel.ONE;
 
+    public static enum Type
+    {
+        LOGGED, UNLOGGED, COUNTER
+    }
+
+    protected Type type;
+
     private final ConsistencyLevel cLevel;
     private Long timestamp;
     private final int timeToLive;
@@ -73,10 +80,32 @@ public abstract class ModificationStatement extends 
CFStatement implements CQLSt
 
     public ResultMessage execute(ClientState state, List<ByteBuffer> 
variables) throws RequestExecutionException, RequestValidationException
     {
-        StorageProxy.mutate(getMutations(state, variables, false), 
getConsistencyLevel());
+        Collection<? extends IMutation> mutations = getMutations(state, 
variables, false);
+        ConsistencyLevel cl = getConsistencyLevel();
+
+        // The type should have been set by now or we have a bug
+        assert type != null;
+
+        switch (type)
+        {
+            case LOGGED:
+                if (mutations.size() > 1)
+                    StorageProxy.mutateAtomically((Collection<RowMutation>) 
mutations, cl);
+                else
+                    StorageProxy.mutate(mutations, cl);
+                break;
+            case UNLOGGED:
+            case COUNTER:
+                StorageProxy.mutate(mutations, cl);
+                break;
+            default:
+                throw new AssertionError();
+        }
+
         return null;
     }
 
+
     public ResultMessage executeInternal(ClientState state) throws 
RequestValidationException, RequestExecutionException
     {
         for (IMutation mutation : getMutations(state, 
Collections.<ByteBuffer>emptyList(), true))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index a4a310d..cb4261f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -285,23 +285,30 @@ public class UpdateStatement extends ModificationStatement
 
     public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) 
throws InvalidRequestException
     {
-        boolean hasCommutativeOperation = false;
-
         if (columns != null)
         {
             for (Pair<ColumnIdentifier, Operation> column : columns)
             {
                 if (column.right.getType() == Operation.Type.COUNTER)
-                    hasCommutativeOperation = true;
-
-                if (hasCommutativeOperation && column.right.getType() != 
Operation.Type.COUNTER)
+                {
+                    if (type == null)
+                        type = Type.COUNTER;
+                    else if (type != Type.COUNTER)
+                        throw new InvalidRequestException("Mix of counter and 
non-counter operations is not allowed.");
+                }
+                else if (type == Type.COUNTER)
+                {
                     throw new InvalidRequestException("Mix of counter and 
non-counter operations is not allowed.");
+                }
             }
         }
 
+        if (type == null)
+            type = Type.LOGGED;
+
         // Deal here with the keyspace overwrite thingy to avoid mistake
-        CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily(), 
hasCommutativeOperation);
-        if (hasCommutativeOperation)
+        CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily(), 
type == Type.COUNTER);
+        if (type == Type.COUNTER)
             getConsistencyLevel().validateCounterForWrite(metadata);
 
         cfDef = metadata.getCfDef();

Reply via email to