Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=892382&r1=892381&r2=892382&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Fri Dec 18 21:53:27 2009
@@ -85,175 +85,208 @@
     };
 
     /**
-     * Use this method to have this RowMutation applied
+     * Use this method to have these RowMutations applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
      * the data across to some other replica.
      *
      * This is the ZERO consistency level. We do not wait for replies.
      *
-     * @param rm the mutation to be applied across the replicas
+     * @param mutations the mutations to be applied across the replicas
     */
-    public static void insert(final RowMutation rm)
+    public static void mutate(List<RowMutation> mutations)
     {
         long startTime = System.currentTimeMillis();
         try
         {
+            for (final RowMutation rm: mutations)
+            {
+                try
+        {
             List<InetAddress> naturalEndpoints = 
StorageService.instance().getNaturalEndpoints(rm.key());
-            Map<InetAddress, InetAddress> endpointMap = 
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
-            Message unhintedMessage = null; // lazy initialize for non-local, 
unhinted writes
+                    Map<InetAddress, InetAddress> endpointMap = 
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
+                    Message unhintedMessage = null; // lazy initialize for 
non-local, unhinted writes
 
-            // 3 cases:
-            // 1. local, unhinted write: run directly on write stage
-            // 2. non-local, unhinted write: send row mutation message
-            // 3. hinted write: add hint header, and send message
-            for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
-            {
-                InetAddress target = entry.getKey();
-                InetAddress hintedTarget = entry.getValue();
-                if (target.equals(hintedTarget))
-                {
-                    if (target.equals(FBUtilities.getLocalAddress()))
+                    // 3 cases:
+                    // 1. local, unhinted write: run directly on write stage
+                    // 2. non-local, unhinted write: send row mutation message
+                    // 3. hinted write: add hint header, and send message
+                    for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
                     {
-                        if (logger.isDebugEnabled())
-                            logger.debug("insert writing local key " + 
rm.key());
-                        Runnable runnable = new Runnable()
+                        InetAddress target = entry.getKey();
+                        InetAddress hintedTarget = entry.getValue();
+                        if (target.equals(hintedTarget))
                         {
-                            public void run()
+                            if (target.equals(FBUtilities.getLocalAddress()))
                             {
-                                try
-                                {
-                                    rm.apply();
-                                }
-                                catch (IOException e)
+                                if (logger.isDebugEnabled())
+                                    logger.debug("insert writing local key " + 
rm.key());
+                                Runnable runnable = new Runnable()
                                 {
-                                    throw new IOError(e);
-                                }
+                                    public void run()
+                                    {
+                                        try
+                                        {
+                                            rm.apply();
+                                        }
+                                        catch (IOException e)
+                                        {
+                                            throw new IOError(e);
+                                        }
+                                    }
+                                };
+                                
StageManager.getStage(StageManager.mutationStage_).execute(runnable);
                             }
-                        };
-                        
StageManager.getStage(StageManager.mutationStage_).execute(runnable);
-                    }
-                    else
-                    {
-                        if (unhintedMessage == null)
-                            unhintedMessage = rm.makeRowMutationMessage();
-                        if (logger.isDebugEnabled())
-                            logger.debug("insert writing key " + rm.key() + " 
to " + unhintedMessage.getMessageId() + "@" + target);
-                        
MessagingService.instance().sendOneWay(unhintedMessage, target);
+                            else
+                            {
+                                if (unhintedMessage == null)
+                                    unhintedMessage = 
rm.makeRowMutationMessage();
+                                if (logger.isDebugEnabled())
+                                    logger.debug("insert writing key " + 
rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target);
+                                
MessagingService.instance().sendOneWay(unhintedMessage, target);
+                            }
+                        }
+                        else
+                        {
+                            Message hintedMessage = 
rm.makeRowMutationMessage();
+                            hintedMessage.addHeader(RowMutation.HINT, 
target.getAddress());
+                            if (logger.isDebugEnabled())
+                                logger.debug("insert writing key " + rm.key() 
+ " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + 
target);
+                            
MessagingService.instance().sendOneWay(hintedMessage, hintedTarget);
+                        }
                     }
                 }
-                else
+                catch (IOException e)
                 {
-                    Message hintedMessage = rm.makeRowMutationMessage();
-                    hintedMessage.addHeader(RowMutation.HINT, 
target.getAddress());
-                    if (logger.isDebugEnabled())
-                        logger.debug("insert writing key " + rm.key() + " to " 
+ hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
-                    MessagingService.instance().sendOneWay(hintedMessage, 
hintedTarget);
+                    throw new RuntimeException("error inserting key " + 
rm.key(), e);
                 }
             }
         }
-        catch (IOException e)
-        {
-            throw new RuntimeException("error inserting key " + rm.key(), e);
-        }
         finally
         {
             writeStats.add(System.currentTimeMillis() - startTime);
         }
     }
     
-    public static void insertBlocking(final RowMutation rm, int 
consistency_level) throws UnavailableException, TimedOutException
+    public static void mutateBlocking(List<RowMutation> mutations, int 
consistency_level) throws UnavailableException, TimedOutException
     {
         long startTime = System.currentTimeMillis();
+        ArrayList<WriteResponseHandler> responseHandlers = new 
ArrayList<WriteResponseHandler>();
+
+        RowMutation mostRecentRowMutation = null;
         try
         {
-            List<InetAddress> naturalEndpoints = 
StorageService.instance().getNaturalEndpoints(rm.key());
-            Map<InetAddress, InetAddress> endpointMap = 
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
-            int blockFor = determineBlockFor(naturalEndpoints.size(), 
endpointMap.size(), consistency_level);
-
-            // avoid starting a write we know can't achieve the required 
consistency
-            int liveNodes = 0;
-            for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
-            {
-                if (entry.getKey().equals(entry.getValue()))
-                {
-                    liveNodes++;
-                }
-            }
-            if (liveNodes < blockFor)
-            {
-                throw new UnavailableException();
-            }
-
-            // send out the writes, as in insert() above, but this time with a 
callback that tracks responses
-            final WriteResponseHandler responseHandler = 
StorageService.instance().getWriteResponseHandler(blockFor, consistency_level);
-            Message unhintedMessage = null;
-            for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
+            for (RowMutation rm: mutations)
             {
-                InetAddress target = entry.getKey();
-                InetAddress hintedTarget = entry.getValue();
-
-                if (target.equals(hintedTarget))
+                mostRecentRowMutation = rm;
+                List<InetAddress> naturalEndpoints = 
StorageService.instance().getNaturalEndpoints(rm.key());
+                Map<InetAddress, InetAddress> endpointMap = 
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
+                int blockFor = determineBlockFor(naturalEndpoints.size(), 
endpointMap.size(), consistency_level);
+    
+                // avoid starting a write we know can't achieve the required 
consistency
+                assureSufficientLiveNodes(endpointMap, blockFor);
+                
+                // send out the writes, as in insert() above, but this time 
with a callback that tracks responses
+                final WriteResponseHandler responseHandler = 
StorageService.instance().getWriteResponseHandler(blockFor, consistency_level);
+                responseHandlers.add(responseHandler);
+                Message unhintedMessage = null;
+                for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
                 {
-                    if (target.equals(FBUtilities.getLocalAddress()))
+                    InetAddress naturalTarget = entry.getKey();
+                    InetAddress maybeHintedTarget = entry.getValue();
+    
+                    if (naturalTarget.equals(maybeHintedTarget))
                     {
-                        if (logger.isDebugEnabled())
-                            logger.debug("insert writing local key " + 
rm.key());
-                        Runnable runnable = new Runnable()
+                        // not hinted
+                        if 
(naturalTarget.equals(FBUtilities.getLocalAddress()))
                         {
-                            public void run()
+                            insertLocalMessage(rm, responseHandler);
+                        }
+                        else
+                        {
+                            // belongs on a different server.  send it there.
+                            if (unhintedMessage == null)
                             {
-                                try
-                                {
-                                    rm.apply();
-                                    responseHandler.localResponse();
-                                }
-                                catch (IOException e)
-                                {
-                                    throw new IOError(e);
-                                }
+                                unhintedMessage = rm.makeRowMutationMessage();
+                                
MessagingService.instance().addCallback(responseHandler, 
unhintedMessage.getMessageId());
                             }
-                        };
-                        
StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+                            if (logger.isDebugEnabled())
+                                logger.debug("insert writing key " + rm.key() 
+ " to " + unhintedMessage.getMessageId() + "@" + naturalTarget);
+                            
MessagingService.instance().sendOneWay(unhintedMessage, naturalTarget);
+                        }
                     }
                     else
                     {
-                        if (unhintedMessage == null)
-                        {
-                            unhintedMessage = rm.makeRowMutationMessage();
-                            
MessagingService.instance().addCallback(responseHandler, 
unhintedMessage.getMessageId());
-                        }
+                        // (hints aren't part of the callback since they don't 
count towards consistency until they are on the final destination node)
+                        Message hintedMessage = rm.makeRowMutationMessage();
+                        hintedMessage.addHeader(RowMutation.HINT, 
naturalTarget.getAddress());
                         if (logger.isDebugEnabled())
-                            logger.debug("insert writing key " + rm.key() + " 
to " + unhintedMessage.getMessageId() + "@" + target);
-                        
MessagingService.instance().sendOneWay(unhintedMessage, target);
+                            logger.debug("insert writing key " + rm.key() + " 
to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " + 
naturalTarget);
+                        MessagingService.instance().sendOneWay(hintedMessage, 
maybeHintedTarget);
                     }
                 }
-                else
-                {
-                    // (hints aren't part of the callback since they don't 
count towards consistency until they are on the final destination node)
-                    Message hintedMessage = rm.makeRowMutationMessage();
-                    hintedMessage.addHeader(RowMutation.HINT, 
target.getAddress());
-                    if (logger.isDebugEnabled())
-                        logger.debug("insert writing key " + rm.key() + " to " 
+ hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
-                    MessagingService.instance().sendOneWay(hintedMessage, 
hintedTarget);
-                }
             }
-
             // wait for writes.  throws timeoutexception if necessary
-            responseHandler.get();
+            for( WriteResponseHandler responseHandler : responseHandlers )
+            {
+                responseHandler.get();
+            }
         }
-        catch (TimeoutException e)
+        catch (IOException e)
         {
-            throw new TimedOutException();
+            if (mostRecentRowMutation == null)
+                throw new RuntimeException("no mutations were seen but found 
an error during write anyway", e);
+            else
+                throw new RuntimeException("error writing key " + 
mostRecentRowMutation.key(), e);
         }
-        catch (IOException e)
+        catch (TimeoutException e)
         {
-            throw new RuntimeException("error writing key " + rm.key(), e);
+            throw new TimedOutException();
         }
         finally
         {
             writeStats.add(System.currentTimeMillis() - startTime);
         }
+
+    }
+
+    private static void assureSufficientLiveNodes(Map<InetAddress, 
InetAddress> endpointMap, int blockFor)
+            throws UnavailableException
+    {
+        int liveNodes = 0;
+        for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
+        {
+            if (entry.getKey().equals(entry.getValue()))
+            {
+                liveNodes++;
+            }
+        }
+        if (liveNodes < blockFor)
+        {
+            throw new UnavailableException();
+        }
+    }
+
+    private static void insertLocalMessage(final RowMutation rm, final 
WriteResponseHandler responseHandler)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("insert writing local key " + rm.key());
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    rm.apply();
+                    responseHandler.localResponse();
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+        };
+        StageManager.getStage(StageManager.mutationStage_).execute(runnable);
     }
 
     private static int determineBlockFor(int naturalTargets, int 
hintedTargets, int consistency_level)

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=892382&r1=892381&r2=892382&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
 Fri Dec 18 21:53:27 2009
@@ -205,4 +205,79 @@
             throw new InvalidRequestException("range finish must come after 
start in the order of traversal");
         }
     }
+
+    public static void validateColumnOrSuperColumn(String keyspace, String 
cfName, ColumnOrSuperColumn cosc)
+            throws InvalidRequestException
+    {
+        if (cosc.column != null)
+        {
+            ThriftValidation.validateColumnPath(keyspace, new 
ColumnPath(cfName, null, cosc.column.name));
+        }
+
+        if (cosc.super_column != null)
+        {
+            for (Column c : cosc.super_column.columns)
+            {
+                ThriftValidation.validateColumnPath(keyspace, new 
ColumnPath(cfName, cosc.super_column.name, c.name));
+            }
+        }
+
+        if (cosc.column == null && cosc.super_column == null) {
+            throw new InvalidRequestException("ColumnOrSuperColumn must have 
one or both of Column or SuperColumn");
+        }
+    }
+
+    public static void validateMutation(String keyspace, String cfName, 
Mutation mut)
+            throws InvalidRequestException
+    {
+        ColumnOrSuperColumn cosc = mut.column_or_supercolumn;
+        Deletion del = mut.deletion;
+
+        if (cosc != null && del != null) {
+            throw new InvalidRequestException("Mutation may have either a 
ColumnOrSuperColumn or a Deletion, but not both");
+        }
+
+        if (cosc != null)
+        {
+            ThriftValidation.validateColumnOrSuperColumn(keyspace, cfName, 
cosc);
+        }
+        else if (del != null)
+        {
+            ThriftValidation.validateDeletion(keyspace, cfName, del);
+        }
+        else
+        {
+            throw new InvalidRequestException("Mutation must have one 
ColumnOrSuperColumn or one Deletion");
+        }
+    }
+
+    public static void validateDeletion(String keyspace, String cfName, 
Deletion del) throws InvalidRequestException
+    {
+        if (del.super_column == null && del.predicate == null)
+        {
+            throw new InvalidRequestException("A Deletion must have a 
SuperColumn, a SlicePredicate or both.");
+        }
+
+        if (del.predicate != null)
+        {
+            validateSlicePredicate(keyspace, cfName, del.super_column, 
del.predicate);
+            if (del.predicate.slice_range != null)
+                throw new InvalidRequestException("Deletion does not yet work 
correctly with SliceRanges.");
+        }
+    }
+
+    public static void validateSlicePredicate(String keyspace, String cfName, 
byte[] scName, SlicePredicate predicate) throws InvalidRequestException
+    {
+        if (predicate.column_names == null && predicate.slice_range == null) {
+            throw new InvalidRequestException("A SlicePredicate must be given 
a list of Columns, a SliceRange, or both");
+        }
+
+        if (predicate.slice_range != null) {
+            validateRange(keyspace, new ColumnParent(cfName, scName), 
predicate.slice_range);
+        }
+
+        if (predicate.column_names != null) {
+            validateColumns(keyspace, cfName, scName, predicate.column_names);
+        }
+    }
 }

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=892382&r1=892381&r2=892382&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Fri Dec 18 21:53:27 
2009
@@ -34,6 +34,25 @@
                   SuperColumn(name='sc2', columns=[Column(_i64(5), 'value5', 
0),
                                                    Column(_i64(6), 'value6', 
0)])]
 
+def _assert_column(keyspace, column_family, key, column, value, ts = 0):
+    try:
+        assert client.get(keyspace, key, ColumnPath(column_family, 
column=column), ConsistencyLevel.ONE).column == Column(column, value, ts)
+    except NotFoundException:
+        raise Exception('expected %s:%s:%s:%s:%s, but was not present' % 
(keyspace, column_family, key, column, value) )
+
+def _assert_columnpath_exists(keyspace, key, column_path):
+    try:
+        assert client.get(keyspace, key, column_path, ConsistencyLevel.ONE)
+    except NotFoundException:
+        raise Exception('expected %s:%s with %s but was not present.' % 
(keyspace, key, column_path) )
+
+def _assert_no_columnpath(keyspace, key, column_path):
+    try:
+        client.get(keyspace, key, column_path, ConsistencyLevel.ONE)
+        assert False, ('columnpath %s existed in %s:%s when it should not' % 
(column_path, keyspace, key))
+    except NotFoundException:
+        assert True, 'column did not exist'
+
 def _insert_simple(block=True):
    return _insert_multi(['key1'], block)
 
@@ -81,11 +100,10 @@
          for result in _big_slice('Keyspace1', 'key1', 
ColumnParent('Standard1'))]
     assert L == _SIMPLE_COLUMNS, L
 
-def _insert_super():
-    client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc1', _i64(4)), 
'value4', 0, ConsistencyLevel.ZERO)
-    client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc2', _i64(5)), 
'value5', 0, ConsistencyLevel.ZERO)
-    client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc2', _i64(6)), 
'value6', 0, ConsistencyLevel.ZERO)
-    time.sleep(0.1)
+def _insert_super(key='key1'):
+    client.insert('Keyspace1', key, ColumnPath('Super1', 'sc1', _i64(4)), 
'value4', 0, ConsistencyLevel.ZERO)
+    client.insert('Keyspace1', key, ColumnPath('Super1', 'sc2', _i64(5)), 
'value5', 0, ConsistencyLevel.ZERO)
+    client.insert('Keyspace1', key, ColumnPath('Super1', 'sc2', _i64(6)), 
'value6', 0, ConsistencyLevel.ZERO)
 
 def _insert_range():
     client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c1'), 
'value1', 0, ConsistencyLevel.ONE)
@@ -113,7 +131,7 @@
     p = SlicePredicate(slice_range=SliceRange('a', 'z', False, 2))
     result = client.get_slice('Keyspace1','key1', ColumnParent('Standard1'), 
p, ConsistencyLevel.ONE)
     assert len(result) == 2, result
-               
+
 def _insert_super_range():
     client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc1', _i64(4)), 
'value4', 0, False)
     client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc2', _i64(5)), 
'value5', 0, False)
@@ -134,10 +152,10 @@
     assert result[0].super_column.name == 'sc3'
     assert result[1].super_column.name == 'sc2'
 
-def _verify_super(supercf='Super1'):
-    assert client.get('Keyspace1', 'key1', ColumnPath(supercf, 'sc1', 
_i64(4)), ConsistencyLevel.ONE).column == Column(_i64(4), 'value4', 0)
+def _verify_super(supercf='Super1', key='key1'):
+    assert client.get('Keyspace1', key, ColumnPath(supercf, 'sc1', _i64(4)), 
ConsistencyLevel.ONE).column == Column(_i64(4), 'value4', 0)
     slice = [result.super_column
-             for result in _big_slice('Keyspace1', 'key1', 
ColumnParent('Super1'))]
+             for result in _big_slice('Keyspace1', key, 
ColumnParent('Super1'))]
     assert slice == _SUPER_COLUMNS, slice
 
 def _expect_exception(fn, type_):
@@ -147,6 +165,7 @@
         pass
     else:
         raise Exception('expected %s; got %s' % (type_.__name__, r))
+    
 def _expect_missing(fn):
     _expect_exception(fn, NotFoundException)
 
@@ -278,6 +297,183 @@
     def test_batch_insert_blocking(self):
         _insert_batch(True)
         _verify_batch()
+        
+    def test_batch_mutate_standard_columns(self):
+        column_families = ['Standard1', 'Standard2']
+        keys = ['key_%d' % i for i in  range(27,32)] 
+        mutations = [Mutation(ColumnOrSuperColumn(c)) for c in _SIMPLE_COLUMNS]
+        mutation_map = dict((column_family, mutations) for column_family in 
column_families)
+        keyed_mutations = dict((key, mutation_map) for key in keys)
+
+        client.batch_mutate('Keyspace1', keyed_mutations, 
ConsistencyLevel.ZERO)
+        time.sleep(0.1)
+
+        for column_family in column_families:
+            for key in keys:
+                _assert_column('Keyspace1', column_family, key, 'c1', 'value1')
+
+    def test_batch_mutate_standard_columns_blocking(self):
+        column_families = ['Standard1', 'Standard2']
+        keys = ['key_%d' % i for i in  range(38,46)] 
+        mutations = [Mutation(ColumnOrSuperColumn(c)) for c in _SIMPLE_COLUMNS]
+        mutation_map = dict((column_family, mutations) for column_family in 
column_families)
+        keyed_mutations = dict((key, mutation_map) for key in keys)
+        
+        client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ONE)
+
+        for column_family in column_families:
+            for key in keys:
+                _assert_column('Keyspace1', column_family, key, 'c1', 'value1')
+
+    def test_batch_mutate_remove_standard_columns(self):
+
+        column_families = ['Standard1', 'Standard2']
+        keys = ['key_%d' % i for i in range(11,21)]
+        _insert_multi(keys)
+
+        mutations = [Mutation(deletion=Deletion(20, 
predicate=SlicePredicate(column_names=[c.name]))) for c in _SIMPLE_COLUMNS]
+        mutation_map = dict((column_family, mutations) for column_family in 
column_families)
+
+        keyed_mutations = dict((key, mutation_map) for key in keys)
+
+        client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ONE)
+
+        for column_family in column_families:
+            for c in _SIMPLE_COLUMNS:
+                for key in keys:
+                    _assert_no_columnpath('Keyspace1', key, 
ColumnPath(column_family, column=c.name))
+
+    def test_batch_mutate_remove_super_columns_with_standard_under(self):
+        column_families = ['Super1', 'Super2']
+        keys = ['key_%d' % i for i in range(11,21)]
+        _insert_super()
+
+        mutations = []
+        for sc in _SUPER_COLUMNS:
+            names = []
+            for c in sc.columns:
+                names.append(c.name)
+            mutations.append(Mutation(deletion=Deletion(20, 
super_column=c.name, predicate=SlicePredicate(column_names=names))))
+
+        mutation_map = dict((column_family, mutations) for column_family in 
column_families)
+
+        keyed_mutations = dict((key, mutation_map) for key in keys)
+
+        client.batch_mutate('Keyspace1', keyed_mutations, 
ConsistencyLevel.ZERO)
+        time.sleep(0.1)
+        for column_family in column_families:
+            for sc in _SUPER_COLUMNS:
+                for c in sc.columns:
+                    for key in keys:
+                        _assert_no_columnpath('Keyspace1', key, 
ColumnPath(column_family, super_column=sc.name, column=c.name))
+
+    def 
test_batch_mutate_remove_super_columns_with_none_given_underneath(self):
+        keys = ['key_%d' % i for i in range(17,21)]
+
+        for key in keys:
+            _insert_super(key)
+
+        mutations = []
+
+        for sc in _SUPER_COLUMNS:
+            mutations.append(Mutation(deletion=Deletion(20,
+                                                        super_column=sc.name)))
+
+        mutation_map = {'Super1': mutations}
+
+        keyed_mutations = dict((key, mutation_map) for key in keys)
+
+        # Sanity check
+        for sc in _SUPER_COLUMNS:
+            for key in keys:
+                _assert_columnpath_exists('Keyspace1', key, 
ColumnPath('Super1', super_column=sc.name))
+
+        client.batch_mutate('Keyspace1', keyed_mutations, 
ConsistencyLevel.ZERO)
+        time.sleep(0.1)
+
+        for sc in _SUPER_COLUMNS:
+            for c in sc.columns:
+                for key in keys:
+                    _assert_no_columnpath('Keyspace1', key, 
ColumnPath('Super1', super_column=sc.name))
+
+    def test_batch_mutate_insertions_and_deletions(self):
+        first_insert = SuperColumn("sc1",
+                                   columns=[Column(_i64(20), 'value20', 3),
+                                            Column(_i64(21), 'value21', 3)])
+        second_insert = SuperColumn("sc1",
+                                    columns=[Column(_i64(20), 'value20', 3),
+                                             Column(_i64(21), 'value21', 3)])
+        first_deletion = {'super_column': "sc1",
+                          'predicate': SlicePredicate(column_names=[_i64(22), 
_i64(23)])}
+        second_deletion = {'super_column': "sc2",
+                           'predicate': SlicePredicate(column_names=[_i64(22), 
_i64(23)])}
+
+        keys = ['key_30', 'key_31']
+        for key in keys:
+            sc = SuperColumn('sc1',[Column(_i64(22), 'value22', 0),
+                                    Column(_i64(23), 'value23', 0)])
+            mutation = {'Super1': [ColumnOrSuperColumn(super_column=sc)]}
+            client.batch_insert('Keyspace1', key, mutation, 
ConsistencyLevel.ONE)
+
+            sc2 = SuperColumn('sc2', [Column(_i64(22), 'value22', 0),
+                                      Column(_i64(23), 'value23', 0)])
+            mutation2 = {'Super2': [ColumnOrSuperColumn(super_column=sc2)]}
+            client.batch_insert('Keyspace1', key, mutation2, 
ConsistencyLevel.ONE)
+
+        mutation_map = {
+            'Super1' : 
[Mutation(ColumnOrSuperColumn(super_column=first_insert)),
+                        Mutation(deletion=Deletion(3, **first_deletion))],
+        
+            'Super2' : [Mutation(deletion=Deletion(2, **second_deletion)),
+                        
Mutation(ColumnOrSuperColumn(super_column=second_insert))]
+            }
+
+        keyed_mutations = dict((key, mutation_map) for key in keys)
+        client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ONE)
+
+        for key in keys:
+            for c in [_i64(22), _i64(23)]:
+                _assert_no_columnpath('Keyspace1',
+                                      key,
+                                      ColumnPath('Super1', super_column='sc1', 
column=c))
+                _assert_no_columnpath('Keyspace1',
+                                      key,
+                                      ColumnPath('Super2', super_column='sc2', 
column=c))
+
+            for c in [_i64(20), _i64(21)]:
+                _assert_columnpath_exists('Keyspace1', key,
+                                          ColumnPath('Super1',
+                                                     super_column='sc1',
+                                                     column=c))
+                _assert_columnpath_exists('Keyspace1', key,
+                                          ColumnPath('Super2',
+                                                     super_column='sc1',
+                                                     column=c))
+
+    def test_batch_mutate_validates_deletions(self):
+        def empty_deletion():
+            client.batch_mutate('Keyspace1',
+                                {'key_33': {'Standard1': 
[Mutation(deletion=Deletion(2))]}},
+                                ConsistencyLevel.ONE)
+        _expect_exception(empty_deletion, InvalidRequestException)
+
+    def 
test_batch_mutate_does_not_accept_cosc_and_deletion_in_same_mutation(self):
+        def too_full():
+            col = ColumnOrSuperColumn(column=Column("foo", 'bar', 0))
+            dele = Deletion(2, predicate=SlicePredicate(column_names=['baz']))
+            client.batch_mutate('Keyspace1',
+                                {'key_34': {'Standard1': [Mutation(col, 
dele)]}},
+                                 ConsistencyLevel.ONE)
+        _expect_exception(too_full, InvalidRequestException)
+
+    def test_batch_mutate_does_not_yet_accept_slice_ranges(self):
+        def send_range():
+            sp = SlicePredicate(slice_range=SliceRange(start='0', finish="", 
count=10))
+            d = Deletion(2, predicate=sp)
+            client.batch_mutate('Keyspace1',
+                                {'key_35': 
{'Standard1':[Mutation(deletion=d)]}},
+                                 ConsistencyLevel.ONE)
+        _expect_exception(send_range, InvalidRequestException)
 
     def test_column_name_lengths(self):
         _expect_exception(lambda: client.insert('Keyspace1', 'key1', 
ColumnPath('Standard1', column=''), 'value', 0, ConsistencyLevel.ONE), 
InvalidRequestException)


Reply via email to