Author: jbellis
Date: Tue Feb 16 20:44:56 2010
New Revision: 910681

URL: http://svn.apache.org/viewvc?rev=910681&view=rev
Log:
add range query stress testing.  patch by Brandon Williams; reviewed by jbellis 
for CASSANDRA-765

Modified:
    incubator/cassandra/trunk/contrib/py_stress/stress.py

Modified: incubator/cassandra/trunk/contrib/py_stress/stress.py
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/py_stress/stress.py?rev=910681&r1=910680&r2=910681&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/py_stress/stress.py (original)
+++ incubator/cassandra/trunk/contrib/py_stress/stress.py Tue Feb 16 20:44:56 
2010
@@ -74,7 +74,7 @@
 parser.add_option('-m', '--framed', action="store_true", dest="framed",
                   help="use framed transport")
 parser.add_option('-o', '--operation', type="choice", dest="operation",
-                  default="insert", choices=('insert', 'read'),
+                  default="insert", choices=('insert', 'read', 'rangeslice'),
                   help="operation to perform")
 parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1,
                   help="number of super columns per key")
@@ -85,6 +85,9 @@
                   help="ignore errors inserting or reading")
 parser.add_option('-i', '--progress-interval', type="int", default=10,
                   dest="interval", help="progress report interval (seconds)")
+parser.add_option('-g', '--get-range-slice-count', type="int", default=1000,
+                  dest="rangecount",
+                  help="amount of keys to get_range_slice per call")
 
 (options, args) = parser.parse_args()
  
@@ -105,14 +108,17 @@
 mean = total_keys / 2
 
 def key_generator_gauss():
+    fmt = '%0' + str(len(str(total_keys))) + 'd'
     while True:
         guess = gauss(mean, stdev)
         if 0 <= guess < total_keys:
-            return int(guess)
+            return fmt % int(guess)
     
 # a generator that will generate all keys w/ equal probability.  this is the
 # worst case for caching.
-key_generator_random = lambda: randint(0, total_keys - 1)
+def key_generator_random():
+    fmt = '%0' + str(len(str(total_keys))) + 'd'
+    return fmt % randint(0, total_keys - 1)
 
 key_generator = key_generator_gauss
 if options.random:
@@ -154,10 +160,11 @@
     def run(self):
         data = md5(str(get_ident())).hexdigest()
         columns = [Column(chr(ord('A') + j), data, 0) for j in 
xrange(columns_per_key)]
+        fmt = '%0' + str(len(str(total_keys))) + 'd'
         if 'super' == options.cftype:
             supers = [SuperColumn(chr(ord('A') + j), columns) for j in 
xrange(supers_per_key)]
         for i in self.range:
-            key = str(i)
+            key = fmt % i
             if 'super' == options.cftype:
                 cfmap= {'Super1': [ColumnOrSuperColumn(super_column=s) for s 
in supers]}
             else:
@@ -181,7 +188,7 @@
         p = SlicePredicate(slice_range=SliceRange('', '', False, 
columns_per_key))
         if 'super' == options.cftype:
             for i in xrange(keys_per_thread):
-                key = str(key_generator())
+                key = key_generator()
                 for j in xrange(supers_per_key):
                     parent = ColumnParent('Super1', chr(ord('A') + j))
                     start = time.time()
@@ -200,7 +207,7 @@
         else:
             parent = ColumnParent('Standard1')
             for i in xrange(keys_per_thread):
-                key = str(key_generator())
+                key = key_generator()
                 start = time.time()
                 try:
                     r = self.cclient.get_slice('Keyspace1', key, parent, p, 
ConsistencyLevel.ONE)
@@ -215,6 +222,57 @@
                 self.latencies[self.idx] += time.time() - start
                 self.counts[self.idx] += 1
 
+class RangeSlicer(Operation):
+    def run(self):
+        begin = self.range[0]
+        end = self.range[-1]
+        current = begin
+        last = current + options.rangecount
+        fmt = '%0' + str(len(str(total_keys))) + 'd'
+        p = SlicePredicate(slice_range=SliceRange('', '', False, 
columns_per_key))
+        if 'super' == options.cftype:
+            while current < end:
+                start = fmt % current
+                finish = fmt % last
+                res = []
+                for j in xrange(supers_per_key):
+                    parent = ColumnParent('Super1', chr(ord('A') + j)) 
+                    begin = time.time()
+                    try:
+                        res = self.cclient.get_range_slice('Keyspace1', 
parent, p, start,finish, options.rangecount, ConsistencyLevel.ONE)
+                        if not res: raise RuntimeError("Key %s not found" % 
key)
+                    except KeyboardInterrupt:
+                        raise
+                    except Exception, e:
+                        if options.ignore:
+                            print e
+                        else:
+                            raise
+                    self.latencies[self.idx] += time.time() - begin
+                    self.counts[self.idx] += 1
+                current += len(r) + 1
+                last += len(r)
+        else:
+            parent = ColumnParent('Standard1')
+            while current < end:
+                start = fmt % current 
+                finish = fmt % last
+                begin = time.time()
+                try:
+                    r = self.cclient.get_range_slice('Keyspace1', parent, p, 
start, finish, options.rangecount, ConsistencyLevel.ONE)
+                    if not r: raise RuntimeError("Range not found:", start, 
finish)
+                except KeyboardInterrupt:
+                    raise
+                except Exception, e:
+                    if options.ignore:
+                        print e
+                    else:
+                        raise
+                current += len(r) + 1
+                last += len(r)
+                self.latencies[self.idx] += time.time() - begin
+                self.counts[self.idx] += 1
+
 
 class OperationFactory:
     @staticmethod
@@ -223,6 +281,8 @@
             return Reader(i, counts, latencies)
         elif type == 'insert':
             return Inserter(i, counts, latencies)
+        elif type == 'rangeslice':
+            return RangeSlicer(i, counts, latencies)
         else:
             raise RuntimeError, 'Unsupported op!'
 
@@ -269,6 +329,9 @@
         threads = self.create_threads('read')
         self.run_test(options.file,threads);
         
+    def rangeslice(self):
+        threads = self.create_threads('rangeslice')
+        self.run_test(options.file,threads);
 
 stresser = Stress()
 benchmark = getattr(stresser, options.operation, None)


Reply via email to