Author: jbellis
Date: Thu Nov 12 02:20:41 2009
New Revision: 835196

URL: http://svn.apache.org/viewvc?rev=835196&view=rev
Log:
various improvements to stress.py (see issue comments).  patch by Scott White; 
reviewed by Brandon Williams for CASSANDRA-542

Modified:
    incubator/cassandra/trunk/test/system/stress.py

Modified: incubator/cassandra/trunk/test/system/stress.py
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/stress.py?rev=835196&r1=835195&r2=835196&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/stress.py (original)
+++ incubator/cassandra/trunk/test/system/stress.py Thu Nov 12 02:20:41 2009
@@ -19,14 +19,15 @@
 # (read tests expect insert tests to have run first too.)
 
 try:
-    from multiprocessing import Process as Thread
+    from multiprocessing import Array as array, Process as Thread
     from uuid import uuid1 as get_ident
     Thread.isAlive = Thread.is_alive
 except ImportError:
     from threading import Thread
     from thread import get_ident
+    from array import array
 from hashlib import md5
-import time
+import time, random
 from random import randint, gauss
 
 from . import get_client, root, CassandraTester
@@ -37,6 +38,9 @@
 N_THREADS = 50
 KEYS_PER_THREAD = TOTAL_KEYS / N_THREADS
 COLUMNS_PER_KEY = 5
+# this allows client to round robin requests directly for
+# simple request load-balancing
+NODES = ["localhost"]
 
 # a generator that generates all keys according to a bell curve centered
 # around the middle of the keys generated (0..TOTAL_KEYS).  Remember that
@@ -54,76 +58,77 @@
 # worst case for caching.
 # key_generator = lambda: randint(0, TOTAL_KEYS - 1)
 
-
-class Inserter(Thread):
-    def __init__(self, i):
+class Operation(Thread):
+    def __init__(self, i, counts):
         Thread.__init__(self)
         self.range = xrange(KEYS_PER_THREAD * i, KEYS_PER_THREAD * (i + 1))
-        self.count = 0
+        self.idx = i
+        self.counts = counts
+        [hostname] = random.sample(NODES, 1)
+        self.cclient = get_client(host=hostname,port=9160)
+        self.cclient.transport.open()
 
+class Inserter(Operation):
     def run(self):
-        client = get_client(port=9160)
-        client.transport.open()
         data = md5(str(get_ident())).hexdigest()
         columns = [Column(chr(ord('A') + j), data, 0) for j in 
xrange(COLUMNS_PER_KEY)]
         for i in self.range:
             key = str(i)
             cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in 
columns]}
-            client.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
-            self.count += 1
-
-
-class Reader(Thread):
-    def __init__(self):
-        Thread.__init__(self)
-        self.count = 0
+            self.cclient.batch_insert('Keyspace1', key, cfmap, 
ConsistencyLevel.ONE)
+            self.counts[self.idx]=self.counts[self.idx]+1
 
+class Reader(Operation):
     def run(self):
-        client = get_client(port=9160)
-        client.transport.open()
         parent = ColumnParent('Standard1')
         p = SlicePredicate(slice_range=SliceRange('', '', False, 
COLUMNS_PER_KEY))
         for i in xrange(KEYS_PER_THREAD):
             key = str(key_generator())
-            client.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
-            self.count += 1
+            self.cclient.get_slice('Keyspace1', key, parent, p, 
ConsistencyLevel.ONE)
+            self.counts[self.idx]=self.counts[self.idx]+1
 
+class OperationFactory:
+    @staticmethod
+    def create(type,i,counts):
+      if type == 'read':
+        return Reader(i, counts)
+      elif type == 'insert':
+        return Inserter(i, counts)
+      else:
+        raise RuntimeError, 'Unsupported op!'
 
 class Stress(CassandraTester):
     runserver = False
+    counts = array('i', [0]*N_THREADS)
 
-    def insert(self):
+    def create_threads(self,type):
         threads = []
         for i in xrange(N_THREADS):
-            th = Inserter(i)
+            th = OperationFactory.create(type,i, self.counts)
             threads.append(th)
             th.start()
+        return threads
 
+    def run_test(self,filename,threads):
+        start_t = time.time()
+        file(filename,'w').write('total,interval_op_rate,elapsed_time\n')
         total = old_total = 0
         while True:
-            time.sleep(10)
+            interval = 10
+            time.sleep(interval)
             old_total = total
-            total = sum(th.count for th in threads)
+            total = sum(self.counts[th.idx] for th in threads)
             delta = total - old_total
-            file('/tmp/progress', 'w').write('%d at %d/s\n' % (total, delta / 
10))
+            elapsed_t = int(time.time()-start_t)
+            file(filename, 'a').write('%d,%d,%d\n' % (total, delta / 
interval,elapsed_t))
             if not [th for th in threads if th.isAlive()]:
-                file('/tmp/progress', 'w').write('done -- %s\n' % str(total))
                 break
 
-    def read(self):
-        threads = []
-        for i in xrange(N_THREADS):
-            th = Reader()
-            threads.append(th)
-            th.start()
+    def insert(self):
+        threads = self.create_threads('insert')
+        self.run_test("/tmp/progress_insert",threads);
 
-        total = old_total = 0
-        while True:
-            time.sleep(10)
-            old_total = total
-            total = sum(th.count for th in threads)
-            delta = total - old_total
-            file('/tmp/progress', 'w').write('%d at %d/s\n' % (total, delta / 
10))
-            if not [th for th in threads if th.isAlive()]:
-                file('/tmp/progress', 'w').write('done -- %s\n' % str(total))
-                break
+    def read(self):
+        threads = self.create_threads('read')
+        self.run_test("/tmp/progress_reads",threads);
+        


Reply via email to