Author: jbellis
Date: Tue May 10 21:41:50 2011
New Revision: 1101660

URL: http://svn.apache.org/viewvc?rev=1101660&view=rev
Log:
optimize batches containing multiple updates to the same row
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2583

Modified:
    cassandra/branches/cassandra-0.8.1/CHANGES.txt
    
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
    
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
    
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
    
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java

Modified: cassandra/branches/cassandra-0.8.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/CHANGES.txt?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8.1/CHANGES.txt Tue May 10 21:41:50 2011
@@ -9,6 +9,8 @@
    (CASSANDRA-2355)
  * add CompositeType and DynamicCompositeType (CASSANDRA-2231)
  * add CQL TTL support (CASSANDRA-2476)
+ * optimize batches containing multiple updates to the same row
+   (CASSANDRA-2583)
 
 
 0.8.0-?

Modified: 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
 (original)
+++ 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
 Tue May 10 21:41:50 2011
@@ -23,7 +23,9 @@ package org.apache.cassandra.cql;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public abstract class AbstractModification
@@ -91,10 +93,10 @@ public abstract class AbstractModificati
      *
      * @return list of the mutations
      *
-     * @throws org.apache.cassandra.thrift.InvalidRequestException on the 
wrong request
+     * @throws InvalidRequestException on the wrong request
      */
     public abstract List<RowMutation> prepareRowMutations(String keyspace, 
ClientState clientState)
-            throws org.apache.cassandra.thrift.InvalidRequestException;
+            throws InvalidRequestException;
 
     /**
      * Convert statement into a list of mutations to apply on the server
@@ -105,8 +107,39 @@ public abstract class AbstractModificati
      *
      * @return list of the mutations
      *
-     * @throws org.apache.cassandra.thrift.InvalidRequestException on the 
wrong request
+     * @throws InvalidRequestException on the wrong request
      */
     public abstract List<RowMutation> prepareRowMutations(String keyspace, 
ClientState clientState, Long timestamp)
-            throws org.apache.cassandra.thrift.InvalidRequestException;
+            throws InvalidRequestException;
+
+    /**
+     * Compute a row mutation for a single key
+     *
+     * @param key The key for mutation
+     * @param keyspace The keyspace
+     * @param timestamp The global timestamp for mutation
+     *
+     * @return row mutation
+     *
+     * @throws InvalidRequestException on the wrong request
+     */
+    public abstract RowMutation mutationForKey(ByteBuffer key, String 
keyspace, Long timestamp)
+        throws InvalidRequestException;
+
+    /**
+     * Compute a row mutation for a single key and add it to the given 
RowMutation object
+     *
+     * @param mutation The row mutation to add computed mutation into
+     * @param keyspace The keyspace
+     * @param timestamp The global timestamp for mutation
+     *
+     * @throws InvalidRequestException on the wrong request
+     */
+    public abstract void mutationForKey(RowMutation mutation, String keyspace, 
Long timestamp)
+            throws InvalidRequestException;
+
+    /**
+     * @return a list of the keys associated with the statement
+     */
+    public abstract List<Term> getKeys();
 }

Modified: 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
 (original)
+++ 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
 Tue May 10 21:41:50 2011
@@ -20,14 +20,23 @@
  */
 package org.apache.cassandra.cql;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
+import static 
org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
  *
@@ -78,11 +87,39 @@ public class BatchStatement
 
     public List<RowMutation> getMutations(String keyspace, ClientState 
clientState) throws InvalidRequestException
     {
+        // To avoid unnecessary authorizations.
+        List<String> seenColumnFamilies = new ArrayList<String>();
+
         List<RowMutation> batch = new LinkedList<RowMutation>();
 
         for (AbstractModification statement : statements)
         {
-            batch.addAll(statement.prepareRowMutations(keyspace, clientState, 
timestamp));
+            final String columnFamily = statement.getColumnFamily();
+
+            authorizeColumnFamily(keyspace, columnFamily, clientState, 
seenColumnFamilies);
+
+            AbstractType<?> keyValidator = getKeyType(keyspace, columnFamily);
+
+            for (Term rawKey : statement.getKeys()) // for each key of the 
statement
+            {
+                ByteBuffer key = rawKey.getByteBuffer(keyValidator);
+
+                boolean found = false;
+
+                for (RowMutation mutation : batch)
+                {
+                    if (mutation.key().equals(key) && 
hasColumnFamily(mutation.getColumnFamilies(), columnFamily))
+                    {
+                        statement.mutationForKey(mutation, keyspace, 
timestamp);
+
+                        found = true;
+                        break;
+                    }
+                }
+
+                if (!found) // if mutation was not found we should add a new 
one
+                    batch.add(statement.mutationForKey(key, keyspace, 
timestamp));
+            }
         }
 
         return batch;
@@ -93,6 +130,34 @@ public class BatchStatement
         return timestamp != null;
     }
 
+    private boolean hasColumnFamily(Collection<ColumnFamily> columnFamilies, 
String columnFamily)
+    {
+        for (ColumnFamily cf : columnFamilies)
+        {
+            if (cf.metadata().cfName.equals(columnFamily))
+                return true;
+        }
+
+        return false;
+    }
+
+    private void authorizeColumnFamily(String keyspace, String columnFamily, 
ClientState state, List<String> seenCFs)
+    throws InvalidRequestException
+    {
+        validateColumnFamily(keyspace, columnFamily, false);
+
+        if (!seenCFs.contains(columnFamily))
+        {
+            state.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
+            seenCFs.add(columnFamily);
+        }
+    }
+
+    public AbstractType<?> getKeyType(String keyspace, String columnFamily)
+    {
+        return DatabaseDescriptor.getCFMetaData(keyspace, 
columnFamily).getKeyValidator();
+    }
+
     public String toString()
     {
         return String.format("BatchStatement(statements=%s, consistency=%s)", 
statements, consistency);

Modified: 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
 (original)
+++ 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
 Tue May 10 21:41:50 2011
@@ -59,6 +59,7 @@ public class DeleteStatement extends Abs
         return columns;
     }
 
+    /** {@inheritDoc} */
     public List<Term> getKeys()
     {
         return keys;
@@ -74,33 +75,45 @@ public class DeleteStatement extends Abs
     public List<RowMutation> prepareRowMutations(String keyspace, ClientState 
clientState, Long timestamp) throws InvalidRequestException
     {
         clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
-        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, 
false);
-
-        AbstractType comparator = metadata.getComparatorFor(null);
         AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace, 
columnFamily).getKeyValidator();
 
         List<RowMutation> rowMutations = new ArrayList<RowMutation>();
 
         for (Term key : keys)
         {
-            RowMutation rm = new RowMutation(keyspace, 
key.getByteBuffer(keyType));
+            rowMutations.add(mutationForKey(key.getByteBuffer(keyType), 
keyspace, timestamp));
+        }
+
+        return rowMutations;
+    }
+
+    /** {@inheritDoc} */
+    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long 
timestamp) throws InvalidRequestException
+    {
+        RowMutation rm = new RowMutation(keyspace, key);
+
+        mutationForKey(rm, keyspace, timestamp);
 
-            if (columns.size() < 1) // No columns, delete the row
-                rm.delete(new QueryPath(columnFamily), 
System.currentTimeMillis());
-            else    // Delete specific columns
+        return rm;
+    }
+
+    /** {@inheritDoc} */
+    public void mutationForKey(RowMutation mutation, String keyspace, Long 
timestamp) throws InvalidRequestException
+    {
+        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, 
false);
+        AbstractType comparator = metadata.getComparatorFor(null);
+
+        if (columns.size() < 1) // No columns, delete the row
+            mutation.delete(new QueryPath(columnFamily), 
System.currentTimeMillis());
+        else    // Delete specific columns
+        {
+            for (Term column : columns)
             {
-                for (Term column : columns)
-                {
-                    ByteBuffer columnName = column.getByteBuffer(comparator);
-                    validateColumnName(columnName);
-                    rm.delete(new QueryPath(columnFamily, null, columnName), 
(timestamp == null) ? getTimestamp() : timestamp);
-                }
+                ByteBuffer columnName = column.getByteBuffer(comparator);
+                validateColumnName(columnName);
+                mutation.delete(new QueryPath(columnFamily, null, columnName), 
(timestamp == null) ? getTimestamp() : timestamp);
             }
-
-            rowMutations.add(rm);
         }
-
-        return rowMutations;
     }
 
     public String toString()

Modified: 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
 (original)
+++ 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
 Tue May 10 21:41:50 2011
@@ -33,7 +33,6 @@ import org.apache.cassandra.service.Clie
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
-import static org.apache.cassandra.cql.QueryProcessor.validateKey;
 import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
 
 import static 
org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
@@ -157,36 +156,54 @@ public class UpdateStatement extends Abs
      */
     private RowMutation mutationForKey(String keyspace, ByteBuffer key, 
CFMetaData metadata, Long timestamp) throws InvalidRequestException
     {
-        validateKey(key);
+        RowMutation rm = new RowMutation(keyspace, key);
+
+        mutationForKey(rm, keyspace, metadata, timestamp);
+
+        return rm;
+    }
+
+    /** {@inheritDoc} */
+    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long 
timestamp) throws InvalidRequestException
+    {
+        return mutationForKey(keyspace, key, validateColumnFamily(keyspace, 
columnFamily, false), timestamp);
+    }
 
+    /** {@inheritDoc} */
+    public void mutationForKey(RowMutation mutation, String keyspace, Long 
timestamp) throws InvalidRequestException
+    {
+        mutationForKey(mutation, keyspace, validateColumnFamily(keyspace, 
columnFamily, false), timestamp);
+    }
+
+    private void mutationForKey(RowMutation mutation, String keyspace, 
CFMetaData metadata, Long timestamp) throws InvalidRequestException
+    {
         AbstractType<?> comparator = getComparator(keyspace);
 
-        RowMutation rm = new RowMutation(keyspace, key);
         for (Map.Entry<Term, Term> column : getColumns().entrySet())
         {
             ByteBuffer colName = column.getKey().getByteBuffer(comparator);
             ByteBuffer colValue = 
column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
 
             validateColumn(metadata, colName, colValue);
-            rm.add(new QueryPath(columnFamily, null, colName),
-                   colValue,
-                   (timestamp == null) ? getTimestamp() : timestamp,
-                   getTimeToLive());
-        }
 
-        return rm;
+            mutation.add(new QueryPath(columnFamily, null, colName),
+                         colValue,
+                         (timestamp == null) ? getTimestamp() : timestamp,
+                         getTimeToLive());
+        }
     }
 
     public String getColumnFamily()
     {
         return columnFamily;
     }
-    
+
+    /** {@inheritDoc} */
     public List<Term> getKeys()
     {
         return keys;
     }
-    
+
     public Map<Term, Term> getColumns() throws InvalidRequestException
     {
         // Created from an UPDATE


Reply via email to