Author: jbellis
Date: Tue Sep 15 19:55:46 2009
New Revision: 815459

URL: http://svn.apache.org/viewvc?rev=815459&view=rev
Log:
add read testing to stress.py; make stress.py modestly configurable with minor 
editing
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-442

Modified:
    incubator/cassandra/trunk/test/system/stress.py

Modified: incubator/cassandra/trunk/test/system/stress.py
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/stress.py?rev=815459&r1=815458&r2=815459&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/stress.py (original)
+++ incubator/cassandra/trunk/test/system/stress.py Tue Sep 15 19:55:46 2009
@@ -14,37 +14,96 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# PYTHONPATH=test nosetests --tests=system.stress:Stress.ten_million_inserts
+# PYTHONPATH=test nosetests --tests=system.stress:Stress.[insert|read]
+# expects a Cassandra server to be running and listening on port 9160.
+# (read tests expect insert tests to have run first too.)
 
 from hashlib import md5
 from threading import Thread
 from thread import get_ident
 import time
+from random import randint, gauss
 
 from . import get_client, root, CassandraTester
 from ttypes import *
 
+
+TOTAL_KEYS = 1000**2
+N_THREADS = 50
+KEYS_PER_THREAD = TOTAL_KEYS / N_THREADS
+COLUMNS_PER_KEY = 5
+
+# a generator that generates all keys according to a bell curve centered
+# around the middle of the keys generated (0..TOTAL_KEYS).  Remember that
+# about 68% of keys will be within STDEV away from the mean and 
+# about 95% within 2*STDEV.
+STDEV = 3000
+MEAN = TOTAL_KEYS / 2
+def key_generator():
+    while True:
+        guess = gauss(MEAN, STDEV)
+        if 0 <= guess < TOTAL_KEYS:
+            return guess
+    
+# a generator that will generate all keys w/ equal probability.  this is the
+# worst case for caching.
+# key_generator = lambda: randint(0, TOTAL_KEYS - 1)
+
+
 class Inserter(Thread):
+    def __init__(self, i):
+        Thread.__init__(self)
+        self.range = xrange(KEYS_PER_THREAD * i, KEYS_PER_THREAD * (i + 1))
+
     def run(self):
-        id = get_ident()
+        client = get_client(port=9160)
+        client.transport.open()
+        data = md5(str(get_ident())).hexdigest()
+        columns = [Column(chr(ord('A') + j), data, 0) for j in 
xrange(COLUMNS_PER_KEY)]
         self.count = 0
+        for i in self.range:
+            key = str(i)
+            cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in 
columns]}
+            client.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
+            self.count += 1
+
+
+class Reader(Thread):
+    def run(self):
         client = get_client(port=9160)
         client.transport.open()
-        for i in xrange(0, 200):
-            data = md5(str(i)).hexdigest()
-            for j in xrange(0, 1000):
-                key = '%s.%s.%s' % (time.time(), id, j)
-                client.insert('Keyspace1', key, ColumnPath('Standard1', 
column='A'), data, i, 1)
-                client.insert('Keyspace1', key, ColumnPath('Standard1', 
column='B'), data, i, 1)
-                self.count += 1
+        parent = ColumnParent('Standard1')
+        p = SlicePredicate(slice_range=SliceRange('', '', False, 
COLUMNS_PER_KEY))
+        for self.count in xrange(KEYS_PER_THREAD):
+            key = str(key_generator())
+            client.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
+
 
 class Stress(CassandraTester):
     runserver = False
 
-    def ten_million_inserts(self):
+    def insert(self):
+        threads = []
+        for i in xrange(N_THREADS):
+            th = Inserter(i)
+            threads.append(th)
+            th.start()
+
+        total = old_total = 0
+        while True:
+            time.sleep(10)
+            old_total = total
+            total = sum(th.count for th in threads)
+            delta = total - old_total
+            file('/tmp/progress', 'w').write('%d at %d/s\n' % (total, delta / 
10))
+            if not [th for th in threads if th.isAlive()]:
+                file('/tmp/progress', 'w').write('done -- %s\n' % str(total))
+                break
+
+    def read(self):
         threads = []
-        for i in xrange(0, 50):
-            th = Inserter()
+        for i in xrange(N_THREADS):
+            th = Reader()
             threads.append(th)
             th.start()
 


Reply via email to