Remove py_stress. Patch by brandonwilliams, reviewed by slebresne for CASSANDRA-3914
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257d36e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257d36e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257d36e3 Branch: refs/heads/trunk Commit: 257d36e30fb10f4b85cb2290b3898c8f402f38d0 Parents: ea28f42 Author: Brandon Williams <[email protected]> Authored: Fri Feb 24 09:09:53 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Fri Feb 24 09:09:53 2012 -0600 ---------------------------------------------------------------------- tools/py_stress/README.txt | 67 ----- tools/py_stress/stress.py | 522 --------------------------------------- 2 files changed, 0 insertions(+), 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/257d36e3/tools/py_stress/README.txt ---------------------------------------------------------------------- diff --git a/tools/py_stress/README.txt b/tools/py_stress/README.txt deleted file mode 100644 index d7b202a..0000000 --- a/tools/py_stress/README.txt +++ /dev/null @@ -1,67 +0,0 @@ -stress.py -========= - -Description ------------ - -stress.py is a tool for benchmarking and load testing a Cassandra cluster. - -Prequisites ------------ - -Any of the following will work: - - * python2.4 w/multiprocessing - * python2.5 w/multiprocessing - * python2.6 (multiprocessing is in the stdlib) - -You can opt not to use multiprocessing and threads will be used instead, but -python's GIL will be the limiting factor, not Cassandra, so the results will not be -accurate. A warning to this effect will be issued each time you run the program. - -Additionally, you will need to generate the thrift bindings for python: run -'ant gen-thrift-py' in the top-level Cassandra directory. - -stress.py will create the keyspace and column families it needs if they do not -exist during the insert operation. - -Usage ------ - -There are three different modes of operation: - - * inserting (loading test data) - * reading - * range slicing (only works with the OrderPreservingPartioner) - * indexed range slicing (works with RandomParitioner on indexed ColumnFamilies) - -Important options: - -o or --operation - Sets the operation mode, one of 'insert', 'read', 'rangeslice', or 'indexedrangeslice' - -n or --num-keys: - the number of rows to insert/read/slice - -d or --nodes: - the node(s) to perform the test against. For multiple nodes, supply a - comma-separated list without spaces, ex: cassandra1,cassandra2,cassandra3 - -y or --family-type: - Sets the ColumnFamily type. One of 'regular', or 'super'. If using super, - you probably want to set the -u option also. - -c or --columns: - the number of columns per row, defaults to 5 - -u or --supercolumns: - use the number of supercolumns specified NOTE: you must set the -y - option appropriately, or this option has no effect. - -g or --get-range-slice-count: - This is only used for the rangeslice operation and will *NOT* work with - the RandomPartioner. You must set the OrderPreservingPartioner in your - storage-conf.xml (note that you will need to wipe all existing data - when switching partioners.) This option sets the number of rows to - slice at a time and defaults to 1000. - -r or --random: - Only used for reads. By default, stress.py will perform reads on rows - with a guassian distribution, which will cause some repeats. Setting - this option makes the reads completely random instead. - -i or --progress-interval: - The interval, in seconds, at which progress will be output. - -Remember that you must perform inserts before performing reads or range slices. http://git-wip-us.apache.org/repos/asf/cassandra/blob/257d36e3/tools/py_stress/stress.py ---------------------------------------------------------------------- diff --git a/tools/py_stress/stress.py b/tools/py_stress/stress.py deleted file mode 100644 index 319fda3..0000000 --- a/tools/py_stress/stress.py +++ /dev/null @@ -1,522 +0,0 @@ -#!/usr/bin/python -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# expects a Cassandra server to be running and listening on port 9160. -# (read tests expect insert tests to have run first too.) - -from __future__ import with_statement - -have_multiproc = False -try: - from multiprocessing import Array as array, Process as Thread - from uuid import uuid1 as get_ident - array('i', 1) # catch "This platform lacks a functioning sem_open implementation" - Thread.isAlive = Thread.is_alive - have_multiproc = True -except ImportError: - from threading import Thread - from thread import get_ident - from array import array -from hashlib import md5 -import time, random, sys, os -from random import randint, gauss -from optparse import OptionParser - -from thrift.transport import TTransport -from thrift.transport import TSocket -from thrift.transport import THttpClient -from thrift.protocol import TBinaryProtocol - -try: - from cassandra import Cassandra - from cassandra.ttypes import * -except ImportError: - # add cassandra directory to sys.path - L = os.path.abspath(__file__).split(os.path.sep)[:-3] - root = os.path.sep.join(L) - _ipath = os.path.join(root, 'interface', 'thrift', 'gen-py') - sys.path.append(os.path.join(_ipath, 'cassandra')) - import Cassandra - from ttypes import * -except ImportError: - print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'" - sys.exit(2) - -try: - from thrift.protocol import fastbinary -except ImportError: - print "WARNING: thrift binary extension not found, benchmark will not be accurate!" - -parser = OptionParser() -parser.add_option('-n', '--num-keys', type="int", dest="numkeys", - help="Number of keys", default=1000**2) -parser.add_option('-N', '--skip-keys', type="float", dest="skipkeys", - help="Fraction of keys to skip initially", default=0) -parser.add_option('-t', '--threads', type="int", dest="threads", - help="Number of threads/procs to use", default=50) -parser.add_option('-c', '--columns', type="int", dest="columns", - help="Number of columns per key", default=5) -parser.add_option('-S', '--column-size', type="int", dest="column_size", - help="Size of column values in bytes", default=34) -parser.add_option('-C', '--cardinality', type="int", dest="cardinality", - help="Number of unique values stored in columns", default=50) -parser.add_option('-d', '--nodes', type="string", dest="nodes", - help="Host nodes (comma separated)", default="localhost") -parser.add_option('-D', '--nodefile', type="string", dest="nodefile", - help="File containing list of nodes (one per line)", default=None) -parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1, - help="standard deviation factor") -parser.add_option('-r', '--random', action="store_true", dest="random", - help="use random key generator (stdev will have no effect)") -parser.add_option('-f', '--file', type="string", dest="file", - help="write output to file") -parser.add_option('-p', '--port', type="int", default=9160, dest="port", - help="thrift port") -parser.add_option('-m', '--unframed', action="store_true", dest="unframed", - help="use unframed transport") -parser.add_option('-o', '--operation', type="choice", dest="operation", - default="insert", choices=('insert', 'read', 'rangeslice', - 'indexedrangeslice', 'multiget'), - help="operation to perform") -parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1, - help="number of super columns per key") -parser.add_option('-y', '--family-type', type="choice", dest="cftype", - choices=('regular','super'), default='regular', - help="column family type") -parser.add_option('-k', '--keep-going', action="store_true", dest="ignore", - help="ignore errors inserting or reading") -parser.add_option('-i', '--progress-interval', type="int", default=10, - dest="interval", help="progress report interval (seconds)") -parser.add_option('-g', '--keys-per-call', type="int", default=1000, - dest="rangecount", - help="amount of keys to get_range_slices or multiget per call") -parser.add_option('-l', '--replication-factor', type="int", default=1, - dest="replication", - help="replication factor to use when creating needed column families") -parser.add_option('-e', '--consistency-level', type="str", default='ONE', - dest="consistency", help="consistency level to use") -parser.add_option('-x', '--create-index', type="choice", - choices=('keys','keys_bitmap', 'none'), default='none', - dest="index", help="type of index to create on needed column families") - -(options, args) = parser.parse_args() - -total_keys = options.numkeys -n_threads = options.threads -keys_per_thread = total_keys / n_threads -columns_per_key = options.columns -supers_per_key = options.supers -# this allows client to round robin requests directly for -# simple request load-balancing -nodes = options.nodes.split(',') -if options.nodefile != None: - with open(options.nodefile) as f: - nodes = [n.strip() for n in f.readlines() if len(n.strip()) > 0] - -#format string for keys -fmt = '%0' + str(len(str(total_keys))) + 'd' - -# a generator that generates all keys according to a bell curve centered -# around the middle of the keys generated (0..total_keys). Remember that -# about 68% of keys will be within stdev away from the mean and -# about 95% within 2*stdev. -stdev = total_keys * options.stdev -mean = total_keys / 2 - -consistency = getattr(ConsistencyLevel, options.consistency, None) -if consistency is None: - print "%s is not a valid consistency level" % options.consistency - sys.exit(3) - -# generates a list of unique, deterministic values -def generate_values(): - values = [] - for i in xrange(0, options.cardinality): - h = md5(str(i)).hexdigest() - values.append(h * int(options.column_size/len(h)) + h[:options.column_size % len(h)]) - return values - -def key_generator_gauss(): - while True: - guess = gauss(mean, stdev) - if 0 <= guess < total_keys: - return fmt % int(guess) - -# a generator that will generate all keys w/ equal probability. this is the -# worst case for caching. -def key_generator_random(): - return fmt % randint(0, total_keys - 1) - -key_generator = key_generator_gauss -if options.random: - key_generator = key_generator_random - - -def get_client(host='127.0.0.1', port=9160): - socket = TSocket.TSocket(host, port) - if options.unframed: - transport = TTransport.TBufferedTransport(socket) - else: - transport = TTransport.TFramedTransport(socket) - protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) - client = Cassandra.Client(protocol) - client.transport = transport - return client - -def make_keyspaces(): - colms = [] - if options.index == 'keys': - colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS)] - elif options.index == 'keys_bitmap': - colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP)] - cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms), - CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')] - keyspace = KsDef(name='Keyspace1', - strategy_class='org.apache.cassandra.locator.SimpleStrategy', - strategy_options={'replication_factor': str(options.replication)}, - cf_defs=cfams) - client = get_client(nodes[0], options.port) - client.transport.open() - try: - client.system_add_keyspace(keyspace) - print "Created keyspaces. Sleeping %ss for propagation." % len(nodes) - time.sleep(len(nodes)) - except InvalidRequestException, e: - print e.why - client.transport.close() - -class Operation(Thread): - def __init__(self, i, opcounts, keycounts, latencies): - Thread.__init__(self) - # generator of the keys to be used - self.range = xrange(int(keys_per_thread * (i + options.skipkeys)), - keys_per_thread * (i + 1)) - # we can't use a local counter, since that won't be visible to the parent - # under multiprocessing. instead, the parent passes a "opcounts" array - # and an index that is our assigned counter. - self.idx = i - self.opcounts = opcounts - # similarly, a shared array for latency and key totals - self.latencies = latencies - self.keycounts = keycounts - # random host for pseudo-load-balancing - [hostname] = random.sample(nodes, 1) - # open client - self.cclient = get_client(hostname, options.port) - self.cclient.transport.open() - self.cclient.set_keyspace('Keyspace1') - -class Inserter(Operation): - def run(self): - values = generate_values() - columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)] - if 'super' == options.cftype: - supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)] - for i in self.range: - key = fmt % i - if 'super' == options.cftype: - cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for s in supers]}} - else: - cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for c in columns]}} - # set the correct column values for this row - value = values[i % len(values)] - for column in columns: - column.value = value - start = time.time() - try: - self.cclient.batch_mutate(cfmap, consistency) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - raise - self.latencies[self.idx] += time.time() - start - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += 1 - - -class Reader(Operation): - def run(self): - p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) - if 'super' == options.cftype: - for i in xrange(keys_per_thread): - key = key_generator() - for j in xrange(supers_per_key): - parent = ColumnParent('Super1', 'S' + str(j)) - start = time.time() - try: - r = self.cclient.get_slice(key, parent, p, consistency) - if not r: raise RuntimeError("Key %s not found" % key) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - raise - self.latencies[self.idx] += time.time() - start - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += 1 - else: - parent = ColumnParent('Standard1') - for i in xrange(keys_per_thread): - key = key_generator() - start = time.time() - try: - r = self.cclient.get_slice(key, parent, p, consistency) - if not r: raise RuntimeError("Key %s not found" % key) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - raise - self.latencies[self.idx] += time.time() - start - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += 1 - -class RangeSlicer(Operation): - def run(self): - begin = self.range[0] - end = self.range[-1] - current = begin - last = current + options.rangecount - p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) - if 'super' == options.cftype: - while current < end: - keyrange = KeyRange(fmt % current, fmt % last, count = options.rangecount) - res = [] - for j in xrange(supers_per_key): - parent = ColumnParent('Super1', 'S' + str(j)) - begin = time.time() - try: - res = self.cclient.get_range_slices(parent, p, keyrange, consistency) - if not res: raise RuntimeError("Key %s not found" % key) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - raise - self.latencies[self.idx] += time.time() - begin - self.opcounts[self.idx] += 1 - current += len(r) + 1 - last = current + len(r) + 1 - self.keycounts[self.idx] += len(r) - else: - parent = ColumnParent('Standard1') - while current < end: - start = fmt % current - finish = fmt % last - keyrange = KeyRange(start, finish, count = options.rangecount) - begin = time.time() - try: - r = self.cclient.get_range_slices(parent, p, keyrange, consistency) - if not r: raise RuntimeError("Range not found:", start, finish) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - print start, finish - raise - current += len(r) + 1 - last = current + len(r) + 1 - self.latencies[self.idx] += time.time() - begin - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += len(r) - -# Each thread queries for a portion of the unique values -# TODO: all threads start at the same key: implement wrapping, and start -# from the thread's appointed range -class IndexedRangeSlicer(Operation): - def run(self): - p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) - values = generate_values() - parent = ColumnParent('Standard1') - # the number of rows with a particular value and the number of values we should query for - expected_per_value = total_keys // len(values) - valuebegin = self.range[0] // expected_per_value - valuecount = len(self.range) // expected_per_value - for valueidx in xrange(valuebegin, valuebegin + valuecount): - received = 0 - start = fmt % 0 - value = values[valueidx % len(values)] - expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value)] - while received < expected_per_value: - clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions) - begin = time.time() - try: - r = self.cclient.get_indexed_slices(parent, clause, p, consistency) - if not r: raise RuntimeError("No indexed values from offset received:", start) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - continue - else: - raise - received += len(r) - # convert max key found back to an integer, and increment it - start = fmt % (1 + max([int(keyslice.key) for keyslice in r])) - self.latencies[self.idx] += time.time() - begin - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += len(r) - - -class MultiGetter(Operation): - def run(self): - p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) - offset = self.idx * keys_per_thread - count = (((self.idx+1) * keys_per_thread) - offset) / options.rangecount - if 'super' == options.cftype: - for x in xrange(count): - keys = [key_generator() for i in xrange(offset, offset + options.rangecount)] - for j in xrange(supers_per_key): - parent = ColumnParent('Super1', 'S' + str(j)) - start = time.time() - try: - r = self.cclient.multiget_slice(keys, parent, p, consistency) - if not r: raise RuntimeError("Keys %s not found" % keys) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - raise - self.latencies[self.idx] += time.time() - start - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += len(keys) - offset += options.rangecount - else: - parent = ColumnParent('Standard1') - for x in xrange(count): - keys = [key_generator() for i in xrange(offset, offset + options.rangecount)] - start = time.time() - try: - r = self.cclient.multiget_slice(keys, parent, p, consistency) - if not r: raise RuntimeError("Keys %s not found" % keys) - except KeyboardInterrupt: - raise - except Exception, e: - if options.ignore: - print e - else: - raise - self.latencies[self.idx] += time.time() - start - self.opcounts[self.idx] += 1 - self.keycounts[self.idx] += len(keys) - offset += options.rangecount - - -class OperationFactory: - @staticmethod - def create(type, i, opcounts, keycounts, latencies): - if type == 'read': - return Reader(i, opcounts, keycounts, latencies) - elif type == 'insert': - return Inserter(i, opcounts, keycounts, latencies) - elif type == 'rangeslice': - return RangeSlicer(i, opcounts, keycounts, latencies) - elif type == 'indexedrangeslice': - return IndexedRangeSlicer(i, opcounts, keycounts, latencies) - elif type == 'multiget': - return MultiGetter(i, opcounts, keycounts, latencies) - else: - raise RuntimeError, 'Unsupported op!' - - -class Stress(object): - opcounts = array('i', [0] * n_threads) - latencies = array('d', [0] * n_threads) - keycounts = array('i', [0] * n_threads) - - def create_threads(self,type): - threads = [] - for i in xrange(n_threads): - th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies) - threads.append(th) - th.start() - return threads - - def run_test(self,filename,threads): - start_t = time.time() - if filename: - outf = open(filename,'w') - else: - outf = sys.stdout - outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n') - epoch = total = old_total = latency = keycount = old_keycount = old_latency = 0 - epoch_intervals = (options.interval * 10) # 1 epoch = 1 tenth of a second - terminate = False - while not terminate: - time.sleep(0.1) - if not [th for th in threads if th.isAlive()]: - terminate = True - epoch = epoch + 1 - if terminate or epoch > epoch_intervals: - epoch = 0 - old_total, old_latency, old_keycount = total, latency, keycount - total = sum(self.opcounts[th.idx] for th in threads) - latency = sum(self.latencies[th.idx] for th in threads) - keycount = sum(self.keycounts[th.idx] for th in threads) - opdelta = total - old_total - keydelta = keycount - old_keycount - delta_latency = latency - old_latency - if opdelta > 0: - delta_formatted = (delta_latency / opdelta) - else: - delta_formatted = 'NaN' - elapsed_t = int(time.time() - start_t) - outf.write('%d,%d,%d,%s,%d\n' - % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t)) - - def insert(self): - threads = self.create_threads('insert') - self.run_test(options.file,threads); - - def read(self): - threads = self.create_threads('read') - self.run_test(options.file,threads); - - def rangeslice(self): - threads = self.create_threads('rangeslice') - self.run_test(options.file,threads); - - def indexedrangeslice(self): - threads = self.create_threads('indexedrangeslice') - self.run_test(options.file,threads); - - def multiget(self): - threads = self.create_threads('multiget') - self.run_test(options.file,threads); - -stresser = Stress() -benchmark = getattr(stresser, options.operation, None) -if not have_multiproc: - print """WARNING: multiprocessing not present, threading will be used. - Benchmark may not be accurate!""" -if options.operation == 'insert': - make_keyspaces() -benchmark()
