Author: jbellis
Date: Wed Oct 28 22:07:50 2009
New Revision: 830776
URL: http://svn.apache.org/viewvc?rev=830776&view=rev
Log:
improve stress.py client-side throughput by using the accelerated thrift
protocol and multiprocessing instead of threads. patch by Brandon Williams;
reviewed by jbellis for CASSANDRA-514
Modified:
incubator/cassandra/trunk/test/system/__init__.py
incubator/cassandra/trunk/test/system/stress.py
Modified: incubator/cassandra/trunk/test/system/__init__.py
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/__init__.py?rev=830776&r1=830775&r2=830776&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/__init__.py (original)
+++ incubator/cassandra/trunk/test/system/__init__.py Wed Oct 28 22:07:50 2009
@@ -33,7 +33,7 @@
def get_client(host='127.0.0.1', port=9170):
socket = TSocket.TSocket(host, port)
transport = TTransport.TBufferedTransport(socket)
- protocol = TBinaryProtocol.TBinaryProtocol(transport)
+ protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
client = Cassandra.Client(protocol)
client.transport = transport
return client
Modified: incubator/cassandra/trunk/test/system/stress.py
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/stress.py?rev=830776&r1=830775&r2=830776&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/stress.py (original)
+++ incubator/cassandra/trunk/test/system/stress.py Wed Oct 28 22:07:50 2009
@@ -18,9 +18,14 @@
# expects a Cassandra server to be running and listening on port 9160.
# (read tests expect insert tests to have run first too.)
+try:
+ from multiprocessing import Process as Thread
+ from uuid import uuid1 as get_ident
+ Thread.isAlive = Thread.is_alive
+except ImportError:
+ from threading import Thread
+ from thread import get_ident
from hashlib import md5
-from threading import Thread
-from thread import get_ident
import time
from random import randint, gauss
@@ -37,13 +42,13 @@
# around the middle of the keys generated (0..TOTAL_KEYS). Remember that
# about 68% of keys will be within STDEV away from the mean and
# about 95% within 2*STDEV.
-STDEV = 3000
+STDEV = TOTAL_KEYS * 0.3
MEAN = TOTAL_KEYS / 2
def key_generator():
while True:
guess = gauss(MEAN, STDEV)
if 0 <= guess < TOTAL_KEYS:
- return guess
+ return int(guess)
# a generator that will generate all keys w/ equal probability. this is the
# worst case for caching.
@@ -54,13 +59,13 @@
def __init__(self, i):
Thread.__init__(self)
self.range = xrange(KEYS_PER_THREAD * i, KEYS_PER_THREAD * (i + 1))
+ self.count = 0
def run(self):
client = get_client(port=9160)
client.transport.open()
data = md5(str(get_ident())).hexdigest()
columns = [Column(chr(ord('A') + j), data, 0) for j in
xrange(COLUMNS_PER_KEY)]
- self.count = 0
for i in self.range:
key = str(i)
cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in
columns]}
@@ -69,14 +74,19 @@
class Reader(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.count = 0
+
def run(self):
client = get_client(port=9160)
client.transport.open()
parent = ColumnParent('Standard1')
p = SlicePredicate(slice_range=SliceRange('', '', False,
COLUMNS_PER_KEY))
- for self.count in xrange(KEYS_PER_THREAD):
+ for i in xrange(KEYS_PER_THREAD):
key = str(key_generator())
client.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
+ self.count += 1
class Stress(CassandraTester):