Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=892382&r1=892381&r2=892382&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Dec 18 21:53:27 2009 @@ -85,175 +85,208 @@ }; /** - * Use this method to have this RowMutation applied + * Use this method to have these RowMutations applied * across all replicas. This method will take care * of the possibility of a replica being down and hint * the data across to some other replica. * * This is the ZERO consistency level. We do not wait for replies. * - * @param rm the mutation to be applied across the replicas + * @param mutations the mutations to be applied across the replicas */ - public static void insert(final RowMutation rm) + public static void mutate(List<RowMutation> mutations) { long startTime = System.currentTimeMillis(); try { + for (final RowMutation rm: mutations) + { + try + { List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key()); - Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints); - Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes + Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints); + Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes - // 3 cases: - // 1. local, unhinted write: run directly on write stage - // 2. non-local, unhinted write: send row mutation message - // 3. hinted write: add hint header, and send message - for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet()) - { - InetAddress target = entry.getKey(); - InetAddress hintedTarget = entry.getValue(); - if (target.equals(hintedTarget)) - { - if (target.equals(FBUtilities.getLocalAddress())) + // 3 cases: + // 1. local, unhinted write: run directly on write stage + // 2. non-local, unhinted write: send row mutation message + // 3. hinted write: add hint header, and send message + for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet()) { - if (logger.isDebugEnabled()) - logger.debug("insert writing local key " + rm.key()); - Runnable runnable = new Runnable() + InetAddress target = entry.getKey(); + InetAddress hintedTarget = entry.getValue(); + if (target.equals(hintedTarget)) { - public void run() + if (target.equals(FBUtilities.getLocalAddress())) { - try - { - rm.apply(); - } - catch (IOException e) + if (logger.isDebugEnabled()) + logger.debug("insert writing local key " + rm.key()); + Runnable runnable = new Runnable() { - throw new IOError(e); - } + public void run() + { + try + { + rm.apply(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }; + StageManager.getStage(StageManager.mutationStage_).execute(runnable); } - }; - StageManager.getStage(StageManager.mutationStage_).execute(runnable); - } - else - { - if (unhintedMessage == null) - unhintedMessage = rm.makeRowMutationMessage(); - if (logger.isDebugEnabled()) - logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target); - MessagingService.instance().sendOneWay(unhintedMessage, target); + else + { + if (unhintedMessage == null) + unhintedMessage = rm.makeRowMutationMessage(); + if (logger.isDebugEnabled()) + logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target); + MessagingService.instance().sendOneWay(unhintedMessage, target); + } + } + else + { + Message hintedMessage = rm.makeRowMutationMessage(); + hintedMessage.addHeader(RowMutation.HINT, target.getAddress()); + if (logger.isDebugEnabled()) + logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target); + MessagingService.instance().sendOneWay(hintedMessage, hintedTarget); + } } } - else + catch (IOException e) { - Message hintedMessage = rm.makeRowMutationMessage(); - hintedMessage.addHeader(RowMutation.HINT, target.getAddress()); - if (logger.isDebugEnabled()) - logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target); - MessagingService.instance().sendOneWay(hintedMessage, hintedTarget); + throw new RuntimeException("error inserting key " + rm.key(), e); } } } - catch (IOException e) - { - throw new RuntimeException("error inserting key " + rm.key(), e); - } finally { writeStats.add(System.currentTimeMillis() - startTime); } } - public static void insertBlocking(final RowMutation rm, int consistency_level) throws UnavailableException, TimedOutException + public static void mutateBlocking(List<RowMutation> mutations, int consistency_level) throws UnavailableException, TimedOutException { long startTime = System.currentTimeMillis(); + ArrayList<WriteResponseHandler> responseHandlers = new ArrayList<WriteResponseHandler>(); + + RowMutation mostRecentRowMutation = null; try { - List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key()); - Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints); - int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(), consistency_level); - - // avoid starting a write we know can't achieve the required consistency - int liveNodes = 0; - for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet()) - { - if (entry.getKey().equals(entry.getValue())) - { - liveNodes++; - } - } - if (liveNodes < blockFor) - { - throw new UnavailableException(); - } - - // send out the writes, as in insert() above, but this time with a callback that tracks responses - final WriteResponseHandler responseHandler = StorageService.instance().getWriteResponseHandler(blockFor, consistency_level); - Message unhintedMessage = null; - for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet()) + for (RowMutation rm: mutations) { - InetAddress target = entry.getKey(); - InetAddress hintedTarget = entry.getValue(); - - if (target.equals(hintedTarget)) + mostRecentRowMutation = rm; + List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key()); + Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints); + int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(), consistency_level); + + // avoid starting a write we know can't achieve the required consistency + assureSufficientLiveNodes(endpointMap, blockFor); + + // send out the writes, as in insert() above, but this time with a callback that tracks responses + final WriteResponseHandler responseHandler = StorageService.instance().getWriteResponseHandler(blockFor, consistency_level); + responseHandlers.add(responseHandler); + Message unhintedMessage = null; + for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet()) { - if (target.equals(FBUtilities.getLocalAddress())) + InetAddress naturalTarget = entry.getKey(); + InetAddress maybeHintedTarget = entry.getValue(); + + if (naturalTarget.equals(maybeHintedTarget)) { - if (logger.isDebugEnabled()) - logger.debug("insert writing local key " + rm.key()); - Runnable runnable = new Runnable() + // not hinted + if (naturalTarget.equals(FBUtilities.getLocalAddress())) { - public void run() + insertLocalMessage(rm, responseHandler); + } + else + { + // belongs on a different server. send it there. + if (unhintedMessage == null) { - try - { - rm.apply(); - responseHandler.localResponse(); - } - catch (IOException e) - { - throw new IOError(e); - } + unhintedMessage = rm.makeRowMutationMessage(); + MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId()); } - }; - StageManager.getStage(StageManager.mutationStage_).execute(runnable); + if (logger.isDebugEnabled()) + logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + naturalTarget); + MessagingService.instance().sendOneWay(unhintedMessage, naturalTarget); + } } else { - if (unhintedMessage == null) - { - unhintedMessage = rm.makeRowMutationMessage(); - MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId()); - } + // (hints aren't part of the callback since they don't count towards consistency until they are on the final destination node) + Message hintedMessage = rm.makeRowMutationMessage(); + hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress()); if (logger.isDebugEnabled()) - logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target); - MessagingService.instance().sendOneWay(unhintedMessage, target); + logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " + naturalTarget); + MessagingService.instance().sendOneWay(hintedMessage, maybeHintedTarget); } } - else - { - // (hints aren't part of the callback since they don't count towards consistency until they are on the final destination node) - Message hintedMessage = rm.makeRowMutationMessage(); - hintedMessage.addHeader(RowMutation.HINT, target.getAddress()); - if (logger.isDebugEnabled()) - logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target); - MessagingService.instance().sendOneWay(hintedMessage, hintedTarget); - } } - // wait for writes. throws timeoutexception if necessary - responseHandler.get(); + for( WriteResponseHandler responseHandler : responseHandlers ) + { + responseHandler.get(); + } } - catch (TimeoutException e) + catch (IOException e) { - throw new TimedOutException(); + if (mostRecentRowMutation == null) + throw new RuntimeException("no mutations were seen but found an error during write anyway", e); + else + throw new RuntimeException("error writing key " + mostRecentRowMutation.key(), e); } - catch (IOException e) + catch (TimeoutException e) { - throw new RuntimeException("error writing key " + rm.key(), e); + throw new TimedOutException(); } finally { writeStats.add(System.currentTimeMillis() - startTime); } + + } + + private static void assureSufficientLiveNodes(Map<InetAddress, InetAddress> endpointMap, int blockFor) + throws UnavailableException + { + int liveNodes = 0; + for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet()) + { + if (entry.getKey().equals(entry.getValue())) + { + liveNodes++; + } + } + if (liveNodes < blockFor) + { + throw new UnavailableException(); + } + } + + private static void insertLocalMessage(final RowMutation rm, final WriteResponseHandler responseHandler) + { + if (logger.isDebugEnabled()) + logger.debug("insert writing local key " + rm.key()); + Runnable runnable = new Runnable() + { + public void run() + { + try + { + rm.apply(); + responseHandler.localResponse(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }; + StageManager.getStage(StageManager.mutationStage_).execute(runnable); } private static int determineBlockFor(int naturalTargets, int hintedTargets, int consistency_level)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=892382&r1=892381&r2=892382&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java Fri Dec 18 21:53:27 2009 @@ -205,4 +205,79 @@ throw new InvalidRequestException("range finish must come after start in the order of traversal"); } } + + public static void validateColumnOrSuperColumn(String keyspace, String cfName, ColumnOrSuperColumn cosc) + throws InvalidRequestException + { + if (cosc.column != null) + { + ThriftValidation.validateColumnPath(keyspace, new ColumnPath(cfName, null, cosc.column.name)); + } + + if (cosc.super_column != null) + { + for (Column c : cosc.super_column.columns) + { + ThriftValidation.validateColumnPath(keyspace, new ColumnPath(cfName, cosc.super_column.name, c.name)); + } + } + + if (cosc.column == null && cosc.super_column == null) { + throw new InvalidRequestException("ColumnOrSuperColumn must have one or both of Column or SuperColumn"); + } + } + + public static void validateMutation(String keyspace, String cfName, Mutation mut) + throws InvalidRequestException + { + ColumnOrSuperColumn cosc = mut.column_or_supercolumn; + Deletion del = mut.deletion; + + if (cosc != null && del != null) { + throw new InvalidRequestException("Mutation may have either a ColumnOrSuperColumn or a Deletion, but not both"); + } + + if (cosc != null) + { + ThriftValidation.validateColumnOrSuperColumn(keyspace, cfName, cosc); + } + else if (del != null) + { + ThriftValidation.validateDeletion(keyspace, cfName, del); + } + else + { + throw new InvalidRequestException("Mutation must have one ColumnOrSuperColumn or one Deletion"); + } + } + + public static void validateDeletion(String keyspace, String cfName, Deletion del) throws InvalidRequestException + { + if (del.super_column == null && del.predicate == null) + { + throw new InvalidRequestException("A Deletion must have a SuperColumn, a SlicePredicate or both."); + } + + if (del.predicate != null) + { + validateSlicePredicate(keyspace, cfName, del.super_column, del.predicate); + if (del.predicate.slice_range != null) + throw new InvalidRequestException("Deletion does not yet work correctly with SliceRanges."); + } + } + + public static void validateSlicePredicate(String keyspace, String cfName, byte[] scName, SlicePredicate predicate) throws InvalidRequestException + { + if (predicate.column_names == null && predicate.slice_range == null) { + throw new InvalidRequestException("A SlicePredicate must be given a list of Columns, a SliceRange, or both"); + } + + if (predicate.slice_range != null) { + validateRange(keyspace, new ColumnParent(cfName, scName), predicate.slice_range); + } + + if (predicate.column_names != null) { + validateColumns(keyspace, cfName, scName, predicate.column_names); + } + } } Modified: incubator/cassandra/trunk/test/system/test_server.py URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=892382&r1=892381&r2=892382&view=diff ============================================================================== --- incubator/cassandra/trunk/test/system/test_server.py (original) +++ incubator/cassandra/trunk/test/system/test_server.py Fri Dec 18 21:53:27 2009 @@ -34,6 +34,25 @@ SuperColumn(name='sc2', columns=[Column(_i64(5), 'value5', 0), Column(_i64(6), 'value6', 0)])] +def _assert_column(keyspace, column_family, key, column, value, ts = 0): + try: + assert client.get(keyspace, key, ColumnPath(column_family, column=column), ConsistencyLevel.ONE).column == Column(column, value, ts) + except NotFoundException: + raise Exception('expected %s:%s:%s:%s:%s, but was not present' % (keyspace, column_family, key, column, value) ) + +def _assert_columnpath_exists(keyspace, key, column_path): + try: + assert client.get(keyspace, key, column_path, ConsistencyLevel.ONE) + except NotFoundException: + raise Exception('expected %s:%s with %s but was not present.' % (keyspace, key, column_path) ) + +def _assert_no_columnpath(keyspace, key, column_path): + try: + client.get(keyspace, key, column_path, ConsistencyLevel.ONE) + assert False, ('columnpath %s existed in %s:%s when it should not' % (column_path, keyspace, key)) + except NotFoundException: + assert True, 'column did not exist' + def _insert_simple(block=True): return _insert_multi(['key1'], block) @@ -81,11 +100,10 @@ for result in _big_slice('Keyspace1', 'key1', ColumnParent('Standard1'))] assert L == _SIMPLE_COLUMNS, L -def _insert_super(): - client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc1', _i64(4)), 'value4', 0, ConsistencyLevel.ZERO) - client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc2', _i64(5)), 'value5', 0, ConsistencyLevel.ZERO) - client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc2', _i64(6)), 'value6', 0, ConsistencyLevel.ZERO) - time.sleep(0.1) +def _insert_super(key='key1'): + client.insert('Keyspace1', key, ColumnPath('Super1', 'sc1', _i64(4)), 'value4', 0, ConsistencyLevel.ZERO) + client.insert('Keyspace1', key, ColumnPath('Super1', 'sc2', _i64(5)), 'value5', 0, ConsistencyLevel.ZERO) + client.insert('Keyspace1', key, ColumnPath('Super1', 'sc2', _i64(6)), 'value6', 0, ConsistencyLevel.ZERO) def _insert_range(): client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c1'), 'value1', 0, ConsistencyLevel.ONE) @@ -113,7 +131,7 @@ p = SlicePredicate(slice_range=SliceRange('a', 'z', False, 2)) result = client.get_slice('Keyspace1','key1', ColumnParent('Standard1'), p, ConsistencyLevel.ONE) assert len(result) == 2, result - + def _insert_super_range(): client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc1', _i64(4)), 'value4', 0, False) client.insert('Keyspace1', 'key1', ColumnPath('Super1', 'sc2', _i64(5)), 'value5', 0, False) @@ -134,10 +152,10 @@ assert result[0].super_column.name == 'sc3' assert result[1].super_column.name == 'sc2' -def _verify_super(supercf='Super1'): - assert client.get('Keyspace1', 'key1', ColumnPath(supercf, 'sc1', _i64(4)), ConsistencyLevel.ONE).column == Column(_i64(4), 'value4', 0) +def _verify_super(supercf='Super1', key='key1'): + assert client.get('Keyspace1', key, ColumnPath(supercf, 'sc1', _i64(4)), ConsistencyLevel.ONE).column == Column(_i64(4), 'value4', 0) slice = [result.super_column - for result in _big_slice('Keyspace1', 'key1', ColumnParent('Super1'))] + for result in _big_slice('Keyspace1', key, ColumnParent('Super1'))] assert slice == _SUPER_COLUMNS, slice def _expect_exception(fn, type_): @@ -147,6 +165,7 @@ pass else: raise Exception('expected %s; got %s' % (type_.__name__, r)) + def _expect_missing(fn): _expect_exception(fn, NotFoundException) @@ -278,6 +297,183 @@ def test_batch_insert_blocking(self): _insert_batch(True) _verify_batch() + + def test_batch_mutate_standard_columns(self): + column_families = ['Standard1', 'Standard2'] + keys = ['key_%d' % i for i in range(27,32)] + mutations = [Mutation(ColumnOrSuperColumn(c)) for c in _SIMPLE_COLUMNS] + mutation_map = dict((column_family, mutations) for column_family in column_families) + keyed_mutations = dict((key, mutation_map) for key in keys) + + client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ZERO) + time.sleep(0.1) + + for column_family in column_families: + for key in keys: + _assert_column('Keyspace1', column_family, key, 'c1', 'value1') + + def test_batch_mutate_standard_columns_blocking(self): + column_families = ['Standard1', 'Standard2'] + keys = ['key_%d' % i for i in range(38,46)] + mutations = [Mutation(ColumnOrSuperColumn(c)) for c in _SIMPLE_COLUMNS] + mutation_map = dict((column_family, mutations) for column_family in column_families) + keyed_mutations = dict((key, mutation_map) for key in keys) + + client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ONE) + + for column_family in column_families: + for key in keys: + _assert_column('Keyspace1', column_family, key, 'c1', 'value1') + + def test_batch_mutate_remove_standard_columns(self): + + column_families = ['Standard1', 'Standard2'] + keys = ['key_%d' % i for i in range(11,21)] + _insert_multi(keys) + + mutations = [Mutation(deletion=Deletion(20, predicate=SlicePredicate(column_names=[c.name]))) for c in _SIMPLE_COLUMNS] + mutation_map = dict((column_family, mutations) for column_family in column_families) + + keyed_mutations = dict((key, mutation_map) for key in keys) + + client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ONE) + + for column_family in column_families: + for c in _SIMPLE_COLUMNS: + for key in keys: + _assert_no_columnpath('Keyspace1', key, ColumnPath(column_family, column=c.name)) + + def test_batch_mutate_remove_super_columns_with_standard_under(self): + column_families = ['Super1', 'Super2'] + keys = ['key_%d' % i for i in range(11,21)] + _insert_super() + + mutations = [] + for sc in _SUPER_COLUMNS: + names = [] + for c in sc.columns: + names.append(c.name) + mutations.append(Mutation(deletion=Deletion(20, super_column=c.name, predicate=SlicePredicate(column_names=names)))) + + mutation_map = dict((column_family, mutations) for column_family in column_families) + + keyed_mutations = dict((key, mutation_map) for key in keys) + + client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ZERO) + time.sleep(0.1) + for column_family in column_families: + for sc in _SUPER_COLUMNS: + for c in sc.columns: + for key in keys: + _assert_no_columnpath('Keyspace1', key, ColumnPath(column_family, super_column=sc.name, column=c.name)) + + def test_batch_mutate_remove_super_columns_with_none_given_underneath(self): + keys = ['key_%d' % i for i in range(17,21)] + + for key in keys: + _insert_super(key) + + mutations = [] + + for sc in _SUPER_COLUMNS: + mutations.append(Mutation(deletion=Deletion(20, + super_column=sc.name))) + + mutation_map = {'Super1': mutations} + + keyed_mutations = dict((key, mutation_map) for key in keys) + + # Sanity check + for sc in _SUPER_COLUMNS: + for key in keys: + _assert_columnpath_exists('Keyspace1', key, ColumnPath('Super1', super_column=sc.name)) + + client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ZERO) + time.sleep(0.1) + + for sc in _SUPER_COLUMNS: + for c in sc.columns: + for key in keys: + _assert_no_columnpath('Keyspace1', key, ColumnPath('Super1', super_column=sc.name)) + + def test_batch_mutate_insertions_and_deletions(self): + first_insert = SuperColumn("sc1", + columns=[Column(_i64(20), 'value20', 3), + Column(_i64(21), 'value21', 3)]) + second_insert = SuperColumn("sc1", + columns=[Column(_i64(20), 'value20', 3), + Column(_i64(21), 'value21', 3)]) + first_deletion = {'super_column': "sc1", + 'predicate': SlicePredicate(column_names=[_i64(22), _i64(23)])} + second_deletion = {'super_column': "sc2", + 'predicate': SlicePredicate(column_names=[_i64(22), _i64(23)])} + + keys = ['key_30', 'key_31'] + for key in keys: + sc = SuperColumn('sc1',[Column(_i64(22), 'value22', 0), + Column(_i64(23), 'value23', 0)]) + mutation = {'Super1': [ColumnOrSuperColumn(super_column=sc)]} + client.batch_insert('Keyspace1', key, mutation, ConsistencyLevel.ONE) + + sc2 = SuperColumn('sc2', [Column(_i64(22), 'value22', 0), + Column(_i64(23), 'value23', 0)]) + mutation2 = {'Super2': [ColumnOrSuperColumn(super_column=sc2)]} + client.batch_insert('Keyspace1', key, mutation2, ConsistencyLevel.ONE) + + mutation_map = { + 'Super1' : [Mutation(ColumnOrSuperColumn(super_column=first_insert)), + Mutation(deletion=Deletion(3, **first_deletion))], + + 'Super2' : [Mutation(deletion=Deletion(2, **second_deletion)), + Mutation(ColumnOrSuperColumn(super_column=second_insert))] + } + + keyed_mutations = dict((key, mutation_map) for key in keys) + client.batch_mutate('Keyspace1', keyed_mutations, ConsistencyLevel.ONE) + + for key in keys: + for c in [_i64(22), _i64(23)]: + _assert_no_columnpath('Keyspace1', + key, + ColumnPath('Super1', super_column='sc1', column=c)) + _assert_no_columnpath('Keyspace1', + key, + ColumnPath('Super2', super_column='sc2', column=c)) + + for c in [_i64(20), _i64(21)]: + _assert_columnpath_exists('Keyspace1', key, + ColumnPath('Super1', + super_column='sc1', + column=c)) + _assert_columnpath_exists('Keyspace1', key, + ColumnPath('Super2', + super_column='sc1', + column=c)) + + def test_batch_mutate_validates_deletions(self): + def empty_deletion(): + client.batch_mutate('Keyspace1', + {'key_33': {'Standard1': [Mutation(deletion=Deletion(2))]}}, + ConsistencyLevel.ONE) + _expect_exception(empty_deletion, InvalidRequestException) + + def test_batch_mutate_does_not_accept_cosc_and_deletion_in_same_mutation(self): + def too_full(): + col = ColumnOrSuperColumn(column=Column("foo", 'bar', 0)) + dele = Deletion(2, predicate=SlicePredicate(column_names=['baz'])) + client.batch_mutate('Keyspace1', + {'key_34': {'Standard1': [Mutation(col, dele)]}}, + ConsistencyLevel.ONE) + _expect_exception(too_full, InvalidRequestException) + + def test_batch_mutate_does_not_yet_accept_slice_ranges(self): + def send_range(): + sp = SlicePredicate(slice_range=SliceRange(start='0', finish="", count=10)) + d = Deletion(2, predicate=sp) + client.batch_mutate('Keyspace1', + {'key_35': {'Standard1':[Mutation(deletion=d)]}}, + ConsistencyLevel.ONE) + _expect_exception(send_range, InvalidRequestException) def test_column_name_lengths(self): _expect_exception(lambda: client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column=''), 'value', 0, ConsistencyLevel.ONE), InvalidRequestException)
