Author: jbellis
Date: Tue May 10 21:41:50 2011
New Revision: 1101660
URL: http://svn.apache.org/viewvc?rev=1101660&view=rev
Log:
optimize batches containing multiple updates to the same row
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2583
Modified:
cassandra/branches/cassandra-0.8.1/CHANGES.txt
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
Modified: cassandra/branches/cassandra-0.8.1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/CHANGES.txt?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8.1/CHANGES.txt Tue May 10 21:41:50 2011
@@ -9,6 +9,8 @@
(CASSANDRA-2355)
* add CompositeType and DynamicCompositeType (CASSANDRA-2231)
* add CQL TTL support (CASSANDRA-2476)
+ * optimize batches containing multiple updates to the same row
+ (CASSANDRA-2583)
0.8.0-?
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
(original)
+++
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/AbstractModification.java
Tue May 10 21:41:50 2011
@@ -23,7 +23,9 @@ package org.apache.cassandra.cql;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import java.nio.ByteBuffer;
import java.util.List;
public abstract class AbstractModification
@@ -91,10 +93,10 @@ public abstract class AbstractModificati
*
* @return list of the mutations
*
- * @throws org.apache.cassandra.thrift.InvalidRequestException on the
wrong request
+ * @throws InvalidRequestException on the wrong request
*/
public abstract List<RowMutation> prepareRowMutations(String keyspace,
ClientState clientState)
- throws org.apache.cassandra.thrift.InvalidRequestException;
+ throws InvalidRequestException;
/**
* Convert statement into a list of mutations to apply on the server
@@ -105,8 +107,39 @@ public abstract class AbstractModificati
*
* @return list of the mutations
*
- * @throws org.apache.cassandra.thrift.InvalidRequestException on the
wrong request
+ * @throws InvalidRequestException on the wrong request
*/
public abstract List<RowMutation> prepareRowMutations(String keyspace,
ClientState clientState, Long timestamp)
- throws org.apache.cassandra.thrift.InvalidRequestException;
+ throws InvalidRequestException;
+
+ /**
+ * Compute a row mutation for a single key
+ *
+ * @param key The key for mutation
+ * @param keyspace The keyspace
+ * @param timestamp The global timestamp for mutation
+ *
+ * @return row mutation
+ *
+ * @throws InvalidRequestException on the wrong request
+ */
+ public abstract RowMutation mutationForKey(ByteBuffer key, String
keyspace, Long timestamp)
+ throws InvalidRequestException;
+
+ /**
+ * Compute a row mutation for a single key and add it to the given
RowMutation object
+ *
+ * @param mutation The row mutation to add computed mutation into
+ * @param keyspace The keyspace
+ * @param timestamp The global timestamp for mutation
+ *
+ * @throws InvalidRequestException on the wrong request
+ */
+ public abstract void mutationForKey(RowMutation mutation, String keyspace,
Long timestamp)
+ throws InvalidRequestException;
+
+ /**
+ * @return a list of the keys associated with the statement
+ */
+ public abstract List<Term> getKeys();
}
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
(original)
+++
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/BatchStatement.java
Tue May 10 21:41:50 2011
@@ -20,14 +20,23 @@
*/
package org.apache.cassandra.cql;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
+import static
org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+
/**
* A <code>BATCH</code> statement parsed from a CQL query.
*
@@ -78,11 +87,39 @@ public class BatchStatement
public List<RowMutation> getMutations(String keyspace, ClientState
clientState) throws InvalidRequestException
{
+ // To avoid unnecessary authorizations.
+ List<String> seenColumnFamilies = new ArrayList<String>();
+
List<RowMutation> batch = new LinkedList<RowMutation>();
for (AbstractModification statement : statements)
{
- batch.addAll(statement.prepareRowMutations(keyspace, clientState,
timestamp));
+ final String columnFamily = statement.getColumnFamily();
+
+ authorizeColumnFamily(keyspace, columnFamily, clientState,
seenColumnFamilies);
+
+ AbstractType<?> keyValidator = getKeyType(keyspace, columnFamily);
+
+ for (Term rawKey : statement.getKeys()) // for each key of the
statement
+ {
+ ByteBuffer key = rawKey.getByteBuffer(keyValidator);
+
+ boolean found = false;
+
+ for (RowMutation mutation : batch)
+ {
+ if (mutation.key().equals(key) &&
hasColumnFamily(mutation.getColumnFamilies(), columnFamily))
+ {
+ statement.mutationForKey(mutation, keyspace,
timestamp);
+
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) // if mutation was not found we should add a new
one
+ batch.add(statement.mutationForKey(key, keyspace,
timestamp));
+ }
}
return batch;
@@ -93,6 +130,34 @@ public class BatchStatement
return timestamp != null;
}
+ private boolean hasColumnFamily(Collection<ColumnFamily> columnFamilies,
String columnFamily)
+ {
+ for (ColumnFamily cf : columnFamilies)
+ {
+ if (cf.metadata().cfName.equals(columnFamily))
+ return true;
+ }
+
+ return false;
+ }
+
+ private void authorizeColumnFamily(String keyspace, String columnFamily,
ClientState state, List<String> seenCFs)
+ throws InvalidRequestException
+ {
+ validateColumnFamily(keyspace, columnFamily, false);
+
+ if (!seenCFs.contains(columnFamily))
+ {
+ state.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
+ seenCFs.add(columnFamily);
+ }
+ }
+
+ public AbstractType<?> getKeyType(String keyspace, String columnFamily)
+ {
+ return DatabaseDescriptor.getCFMetaData(keyspace,
columnFamily).getKeyValidator();
+ }
+
public String toString()
{
return String.format("BatchStatement(statements=%s, consistency=%s)",
statements, consistency);
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
(original)
+++
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/DeleteStatement.java
Tue May 10 21:41:50 2011
@@ -59,6 +59,7 @@ public class DeleteStatement extends Abs
return columns;
}
+ /** {@inheritDoc} */
public List<Term> getKeys()
{
return keys;
@@ -74,33 +75,45 @@ public class DeleteStatement extends Abs
public List<RowMutation> prepareRowMutations(String keyspace, ClientState
clientState, Long timestamp) throws InvalidRequestException
{
clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
- CFMetaData metadata = validateColumnFamily(keyspace, columnFamily,
false);
-
- AbstractType comparator = metadata.getComparatorFor(null);
AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace,
columnFamily).getKeyValidator();
List<RowMutation> rowMutations = new ArrayList<RowMutation>();
for (Term key : keys)
{
- RowMutation rm = new RowMutation(keyspace,
key.getByteBuffer(keyType));
+ rowMutations.add(mutationForKey(key.getByteBuffer(keyType),
keyspace, timestamp));
+ }
+
+ return rowMutations;
+ }
+
+ /** {@inheritDoc} */
+ public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long
timestamp) throws InvalidRequestException
+ {
+ RowMutation rm = new RowMutation(keyspace, key);
+
+ mutationForKey(rm, keyspace, timestamp);
- if (columns.size() < 1) // No columns, delete the row
- rm.delete(new QueryPath(columnFamily),
System.currentTimeMillis());
- else // Delete specific columns
+ return rm;
+ }
+
+ /** {@inheritDoc} */
+ public void mutationForKey(RowMutation mutation, String keyspace, Long
timestamp) throws InvalidRequestException
+ {
+ CFMetaData metadata = validateColumnFamily(keyspace, columnFamily,
false);
+ AbstractType comparator = metadata.getComparatorFor(null);
+
+ if (columns.size() < 1) // No columns, delete the row
+ mutation.delete(new QueryPath(columnFamily),
System.currentTimeMillis());
+ else // Delete specific columns
+ {
+ for (Term column : columns)
{
- for (Term column : columns)
- {
- ByteBuffer columnName = column.getByteBuffer(comparator);
- validateColumnName(columnName);
- rm.delete(new QueryPath(columnFamily, null, columnName),
(timestamp == null) ? getTimestamp() : timestamp);
- }
+ ByteBuffer columnName = column.getByteBuffer(comparator);
+ validateColumnName(columnName);
+ mutation.delete(new QueryPath(columnFamily, null, columnName),
(timestamp == null) ? getTimestamp() : timestamp);
}
-
- rowMutations.add(rm);
}
-
- return rowMutations;
}
public String toString()
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1101660&r1=1101659&r2=1101660&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
(original)
+++
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/cql/UpdateStatement.java
Tue May 10 21:41:50 2011
@@ -33,7 +33,6 @@ import org.apache.cassandra.service.Clie
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
-import static org.apache.cassandra.cql.QueryProcessor.validateKey;
import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
import static
org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
@@ -157,36 +156,54 @@ public class UpdateStatement extends Abs
*/
private RowMutation mutationForKey(String keyspace, ByteBuffer key,
CFMetaData metadata, Long timestamp) throws InvalidRequestException
{
- validateKey(key);
+ RowMutation rm = new RowMutation(keyspace, key);
+
+ mutationForKey(rm, keyspace, metadata, timestamp);
+
+ return rm;
+ }
+
+ /** {@inheritDoc} */
+ public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long
timestamp) throws InvalidRequestException
+ {
+ return mutationForKey(keyspace, key, validateColumnFamily(keyspace,
columnFamily, false), timestamp);
+ }
+ /** {@inheritDoc} */
+ public void mutationForKey(RowMutation mutation, String keyspace, Long
timestamp) throws InvalidRequestException
+ {
+ mutationForKey(mutation, keyspace, validateColumnFamily(keyspace,
columnFamily, false), timestamp);
+ }
+
+ private void mutationForKey(RowMutation mutation, String keyspace,
CFMetaData metadata, Long timestamp) throws InvalidRequestException
+ {
AbstractType<?> comparator = getComparator(keyspace);
- RowMutation rm = new RowMutation(keyspace, key);
for (Map.Entry<Term, Term> column : getColumns().entrySet())
{
ByteBuffer colName = column.getKey().getByteBuffer(comparator);
ByteBuffer colValue =
column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
validateColumn(metadata, colName, colValue);
- rm.add(new QueryPath(columnFamily, null, colName),
- colValue,
- (timestamp == null) ? getTimestamp() : timestamp,
- getTimeToLive());
- }
- return rm;
+ mutation.add(new QueryPath(columnFamily, null, colName),
+ colValue,
+ (timestamp == null) ? getTimestamp() : timestamp,
+ getTimeToLive());
+ }
}
public String getColumnFamily()
{
return columnFamily;
}
-
+
+ /** {@inheritDoc} */
public List<Term> getKeys()
{
return keys;
}
-
+
public Map<Term, Term> getColumns() throws InvalidRequestException
{
// Created from an UPDATE