Author: jbellis
Date: Thu Nov 12 02:20:41 2009
New Revision: 835196
URL: http://svn.apache.org/viewvc?rev=835196&view=rev
Log:
various improvements to stress.py (see issue comments). patch by Scott White;
reviewed by Brandon Williams for CASSANDRA-542
Modified:
incubator/cassandra/trunk/test/system/stress.py
Modified: incubator/cassandra/trunk/test/system/stress.py
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/stress.py?rev=835196&r1=835195&r2=835196&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/stress.py (original)
+++ incubator/cassandra/trunk/test/system/stress.py Thu Nov 12 02:20:41 2009
@@ -19,14 +19,15 @@
# (read tests expect insert tests to have run first too.)
try:
- from multiprocessing import Process as Thread
+ from multiprocessing import Array as array, Process as Thread
from uuid import uuid1 as get_ident
Thread.isAlive = Thread.is_alive
except ImportError:
from threading import Thread
from thread import get_ident
+ from array import array
from hashlib import md5
-import time
+import time, random
from random import randint, gauss
from . import get_client, root, CassandraTester
@@ -37,6 +38,9 @@
N_THREADS = 50
KEYS_PER_THREAD = TOTAL_KEYS / N_THREADS
COLUMNS_PER_KEY = 5
+# this allows client to round robin requests directly for
+# simple request load-balancing
+NODES = ["localhost"]
# a generator that generates all keys according to a bell curve centered
# around the middle of the keys generated (0..TOTAL_KEYS). Remember that
@@ -54,76 +58,77 @@
# worst case for caching.
# key_generator = lambda: randint(0, TOTAL_KEYS - 1)
-
-class Inserter(Thread):
- def __init__(self, i):
+class Operation(Thread):
+ def __init__(self, i, counts):
Thread.__init__(self)
self.range = xrange(KEYS_PER_THREAD * i, KEYS_PER_THREAD * (i + 1))
- self.count = 0
+ self.idx = i
+ self.counts = counts
+ [hostname] = random.sample(NODES, 1)
+ self.cclient = get_client(host=hostname,port=9160)
+ self.cclient.transport.open()
+class Inserter(Operation):
def run(self):
- client = get_client(port=9160)
- client.transport.open()
data = md5(str(get_ident())).hexdigest()
columns = [Column(chr(ord('A') + j), data, 0) for j in
xrange(COLUMNS_PER_KEY)]
for i in self.range:
key = str(i)
cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in
columns]}
- client.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
- self.count += 1
-
-
-class Reader(Thread):
- def __init__(self):
- Thread.__init__(self)
- self.count = 0
+ self.cclient.batch_insert('Keyspace1', key, cfmap,
ConsistencyLevel.ONE)
+ self.counts[self.idx]=self.counts[self.idx]+1
+class Reader(Operation):
def run(self):
- client = get_client(port=9160)
- client.transport.open()
parent = ColumnParent('Standard1')
p = SlicePredicate(slice_range=SliceRange('', '', False,
COLUMNS_PER_KEY))
for i in xrange(KEYS_PER_THREAD):
key = str(key_generator())
- client.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
- self.count += 1
+ self.cclient.get_slice('Keyspace1', key, parent, p,
ConsistencyLevel.ONE)
+ self.counts[self.idx]=self.counts[self.idx]+1
+class OperationFactory:
+ @staticmethod
+ def create(type,i,counts):
+ if type == 'read':
+ return Reader(i, counts)
+ elif type == 'insert':
+ return Inserter(i, counts)
+ else:
+ raise RuntimeError, 'Unsupported op!'
class Stress(CassandraTester):
runserver = False
+ counts = array('i', [0]*N_THREADS)
- def insert(self):
+ def create_threads(self,type):
threads = []
for i in xrange(N_THREADS):
- th = Inserter(i)
+ th = OperationFactory.create(type,i, self.counts)
threads.append(th)
th.start()
+ return threads
+ def run_test(self,filename,threads):
+ start_t = time.time()
+ file(filename,'w').write('total,interval_op_rate,elapsed_time\n')
total = old_total = 0
while True:
- time.sleep(10)
+ interval = 10
+ time.sleep(interval)
old_total = total
- total = sum(th.count for th in threads)
+ total = sum(self.counts[th.idx] for th in threads)
delta = total - old_total
- file('/tmp/progress', 'w').write('%d at %d/s\n' % (total, delta /
10))
+ elapsed_t = int(time.time()-start_t)
+ file(filename, 'a').write('%d,%d,%d\n' % (total, delta /
interval,elapsed_t))
if not [th for th in threads if th.isAlive()]:
- file('/tmp/progress', 'w').write('done -- %s\n' % str(total))
break
- def read(self):
- threads = []
- for i in xrange(N_THREADS):
- th = Reader()
- threads.append(th)
- th.start()
+ def insert(self):
+ threads = self.create_threads('insert')
+ self.run_test("/tmp/progress_insert",threads);
- total = old_total = 0
- while True:
- time.sleep(10)
- old_total = total
- total = sum(th.count for th in threads)
- delta = total - old_total
- file('/tmp/progress', 'w').write('%d at %d/s\n' % (total, delta /
10))
- if not [th for th in threads if th.isAlive()]:
- file('/tmp/progress', 'w').write('done -- %s\n' % str(total))
- break
+ def read(self):
+ threads = self.create_threads('read')
+ self.run_test("/tmp/progress_reads",threads);
+