Author: brandonwilliams Date: Tue Sep 28 16:31:10 2010 New Revision: 1002245
URL: http://svn.apache.org/viewvc?rev=1002245&view=rev Log: Add secondary index ops to stress.py. Patch by Stu Hood, reviewed by brandonwilliams for CASSANDRA-1531 Modified: cassandra/trunk/contrib/py_stress/README.txt cassandra/trunk/contrib/py_stress/stress.py Modified: cassandra/trunk/contrib/py_stress/README.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/README.txt?rev=1002245&r1=1002244&r2=1002245&view=diff ============================================================================== --- cassandra/trunk/contrib/py_stress/README.txt (original) +++ cassandra/trunk/contrib/py_stress/README.txt Tue Sep 28 16:31:10 2010 @@ -33,10 +33,11 @@ There are three different modes of opera * inserting (loading test data) * reading * range slicing (only works with the OrderPreservingPartioner) + * indexed range slicing (works with RandomParitioner on indexed ColumnFamilies) Important options: -o or --operation - Sets the operation mode, one of 'insert', 'read', or 'rangeslice' + Sets the operation mode, one of 'insert', 'read', 'rangeslice', or 'indexedrangeslice' -n or --num-keys: the number of rows to insert/read/slice -d or --nodes: Modified: cassandra/trunk/contrib/py_stress/stress.py URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/stress.py?rev=1002245&r1=1002244&r2=1002245&view=diff ============================================================================== --- cassandra/trunk/contrib/py_stress/stress.py (original) +++ cassandra/trunk/contrib/py_stress/stress.py Tue Sep 28 16:31:10 2010 @@ -65,6 +65,8 @@ parser.add_option('-t', '--threads', typ help="Number of threads/procs to use", default=50) parser.add_option('-c', '--columns', type="int", dest="columns", help="Number of columns per key", default=5) +parser.add_option('-C', '--cardinality', type="int", dest="cardinality", + help="Number of unique values stored in columns", default=50) parser.add_option('-d', '--nodes', type="string", dest="nodes", help="Host nodes (comma separated)", default="localhost") parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1, @@ -79,7 +81,7 @@ parser.add_option('-m', '--unframed', ac help="use unframed transport") parser.add_option('-o', '--operation', type="choice", dest="operation", default="insert", choices=('insert', 'read', 'rangeslice', - 'multiget'), + 'indexedrangeslice', 'multiget'), help="operation to perform") parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1, help="number of super columns per key") @@ -98,6 +100,9 @@ parser.add_option('-l', '--replication-f help="replication factor to use when creating needed column families") parser.add_option('-e', '--consistency-level', type="str", default='ONE', dest="consistency", help="consistency level to use") +parser.add_option('-x', '--create-index', type="choice", + choices=('keys','keys_bitmap', 'none'), default='none', + dest="index", help="type of index to create on needed column families") (options, args) = parser.parse_args() @@ -122,6 +127,13 @@ if consistency is None: print "%s is not a valid consistency level" % options.consistency sys.exit(3) +# generates a list of unique, deterministic values +def generate_values(): + values = [] + for i in xrange(0, options.cardinality): + values.append('%d-%s' % (i, md5(str(i)).hexdigest())) + return values + def key_generator_gauss(): fmt = '%0' + str(len(str(total_keys))) + 'd' while True: @@ -152,7 +164,12 @@ def get_client(host='127.0.0.1', port=91 return client def make_keyspaces(): - cfams = [CfDef(keyspace='Keyspace1', name='Standard1'), + colms = [] + if options.index == 'keys': + colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS)] + elif options.index == 'keys_bitmap': + colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP)] + cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms), CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')] keyspace = KsDef(name='Keyspace1', strategy_class='org.apache.cassandra.locator.SimpleStrategy', replication_factor=options.replication, cf_defs=cfams) client = get_client(nodes[0], options.port) @@ -187,8 +204,8 @@ class Operation(Thread): class Inserter(Operation): def run(self): - data = md5(str(get_ident())).hexdigest() - columns = [Column('C' + str(j), data, time.time() * 1000000) for j in xrange(columns_per_key)] + values = generate_values() + columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)] fmt = '%0' + str(len(str(total_keys))) + 'd' if 'super' == options.cftype: supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)] @@ -198,6 +215,10 @@ class Inserter(Operation): cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for s in supers]}} else: cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for c in columns]}} + # set the correct column values for this row + value = values[i % len(values)] + for column in columns: + column.value = value start = time.time() try: self.cclient.batch_mutate(cfmap, consistency) @@ -308,6 +329,45 @@ class RangeSlicer(Operation): self.opcounts[self.idx] += 1 self.keycounts[self.idx] += len(r) +# Each thread queries for a portion of the unique values +# TODO: all threads start at the same key: implement wrapping, and start +# from the thread's appointed range +class IndexedRangeSlicer(Operation): + def run(self): + fmt = '%0' + str(len(str(total_keys))) + 'd' + p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) + values = generate_values() + parent = ColumnParent('Standard1') + # the number of rows with a particular value and the number of values we should query for + expected_per_value = total_keys // len(values) + valuebegin = self.range[0] // expected_per_value + valuecount = len(self.range) // expected_per_value + for valueidx in xrange(valuebegin, valuebegin + valuecount): + received = 0 + start = fmt % 0 + value = values[valueidx % len(values)] + expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value)] + while received < expected_per_value: + clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions) + begin = time.time() + try: + r = self.cclient.get_indexed_slices(parent, clause, p, consistency) + if not r: raise RuntimeError("No indexed values from offset received:", start) + except KeyboardInterrupt: + raise + except Exception, e: + if options.ignore: + print e + continue + else: + raise + received += len(r) + # convert max key found back to an integer, and increment it + start = fmt % (1 + max([int(keyslice.key) for keyslice in r])) + self.latencies[self.idx] += time.time() - begin + self.opcounts[self.idx] += 1 + self.keycounts[self.idx] += len(r) + class MultiGetter(Operation): def run(self): @@ -364,6 +424,8 @@ class OperationFactory: return Inserter(i, opcounts, keycounts, latencies) elif type == 'rangeslice': return RangeSlicer(i, opcounts, keycounts, latencies) + elif type == 'indexedrangeslice': + return IndexedRangeSlicer(i, opcounts, keycounts, latencies) elif type == 'multiget': return MultiGetter(i, opcounts, keycounts, latencies) else: @@ -427,6 +489,10 @@ class Stress(object): threads = self.create_threads('rangeslice') self.run_test(options.file,threads); + def indexedrangeslice(self): + threads = self.create_threads('indexedrangeslice') + self.run_test(options.file,threads); + def multiget(self): threads = self.create_threads('multiget') self.run_test(options.file,threads);
