Author: jbellis
Date: Tue Sep 15 19:55:46 2009
New Revision: 815459
URL: http://svn.apache.org/viewvc?rev=815459&view=rev
Log:
add read testing to stress.py; make stress.py modestly configurable with minor
editing
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-442
Modified:
incubator/cassandra/trunk/test/system/stress.py
Modified: incubator/cassandra/trunk/test/system/stress.py
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/stress.py?rev=815459&r1=815458&r2=815459&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/stress.py (original)
+++ incubator/cassandra/trunk/test/system/stress.py Tue Sep 15 19:55:46 2009
@@ -14,37 +14,96 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# PYTHONPATH=test nosetests --tests=system.stress:Stress.ten_million_inserts
+# PYTHONPATH=test nosetests --tests=system.stress:Stress.[insert|read]
+# expects a Cassandra server to be running and listening on port 9160.
+# (read tests expect insert tests to have run first too.)
from hashlib import md5
from threading import Thread
from thread import get_ident
import time
+from random import randint, gauss
from . import get_client, root, CassandraTester
from ttypes import *
+
+TOTAL_KEYS = 1000**2
+N_THREADS = 50
+KEYS_PER_THREAD = TOTAL_KEYS / N_THREADS
+COLUMNS_PER_KEY = 5
+
+# a generator that generates all keys according to a bell curve centered
+# around the middle of the keys generated (0..TOTAL_KEYS). Remember that
+# about 68% of keys will be within STDEV away from the mean and
+# about 95% within 2*STDEV.
+STDEV = 3000
+MEAN = TOTAL_KEYS / 2
+def key_generator():
+ while True:
+ guess = gauss(MEAN, STDEV)
+ if 0 <= guess < TOTAL_KEYS:
+ return guess
+
+# a generator that will generate all keys w/ equal probability. this is the
+# worst case for caching.
+# key_generator = lambda: randint(0, TOTAL_KEYS - 1)
+
+
class Inserter(Thread):
+ def __init__(self, i):
+ Thread.__init__(self)
+ self.range = xrange(KEYS_PER_THREAD * i, KEYS_PER_THREAD * (i + 1))
+
def run(self):
- id = get_ident()
+ client = get_client(port=9160)
+ client.transport.open()
+ data = md5(str(get_ident())).hexdigest()
+ columns = [Column(chr(ord('A') + j), data, 0) for j in
xrange(COLUMNS_PER_KEY)]
self.count = 0
+ for i in self.range:
+ key = str(i)
+ cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in
columns]}
+ client.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
+ self.count += 1
+
+
+class Reader(Thread):
+ def run(self):
client = get_client(port=9160)
client.transport.open()
- for i in xrange(0, 200):
- data = md5(str(i)).hexdigest()
- for j in xrange(0, 1000):
- key = '%s.%s.%s' % (time.time(), id, j)
- client.insert('Keyspace1', key, ColumnPath('Standard1',
column='A'), data, i, 1)
- client.insert('Keyspace1', key, ColumnPath('Standard1',
column='B'), data, i, 1)
- self.count += 1
+ parent = ColumnParent('Standard1')
+ p = SlicePredicate(slice_range=SliceRange('', '', False,
COLUMNS_PER_KEY))
+ for self.count in xrange(KEYS_PER_THREAD):
+ key = str(key_generator())
+ client.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
+
class Stress(CassandraTester):
runserver = False
- def ten_million_inserts(self):
+ def insert(self):
+ threads = []
+ for i in xrange(N_THREADS):
+ th = Inserter(i)
+ threads.append(th)
+ th.start()
+
+ total = old_total = 0
+ while True:
+ time.sleep(10)
+ old_total = total
+ total = sum(th.count for th in threads)
+ delta = total - old_total
+ file('/tmp/progress', 'w').write('%d at %d/s\n' % (total, delta /
10))
+ if not [th for th in threads if th.isAlive()]:
+ file('/tmp/progress', 'w').write('done -- %s\n' % str(total))
+ break
+
+ def read(self):
threads = []
- for i in xrange(0, 50):
- th = Inserter()
+ for i in xrange(N_THREADS):
+ th = Reader()
threads.append(th)
th.start()