Alright,

What I'm observing shows better with bigger columns, so I've slightly modified
the stress.py test so that it inserts column of 50K bytes (I attach
the modified stress.py
for info but it really just read 50000 bytes from /dev/null and use
that as data.
I also added a sleep to the insert otherwise cassandra dies on the
insertion :)).

I'm also using 0.6-beta2 from the cassandra website. And I've given 1.5G of RAM
to Cassandra just in case.

I've inserted 1000 row of 100 column each (python stress.py -t 2 -n
1000 -c 100 -i 5)
If I read, I get the roughly the same number of row whether I read the whole row
(python stress.py -t 10 -n 1000 -o read -r -c 100) or only the first column
(python stress.py -t 10 -n 1000 -o read -r -c 1). And that's less that
10 rows by
seconds.

So sure, when I read the whole row, that almost 1000 columns by
seconds, which is
roughly 50M/s troughput, which is quite good. But when I read only the
first column,
I get 10 columns by seconds, that 500K/s, which is less good. Now,
from what I've
understood so far, cassandra doesn't deserialize whole row to read a
single column
(I'm not using supercolumn here), so I don't understand those numbers.

Plus if I insert the same data but 'inlining' everything, that is
100000 rows of 1 column,
then I get read performances of around 400 columns by seconds.
Does that mean that I should put columns in the same row only if every
request will read
at least 40 columns at a time ?

Just to explain why I'm doing such test, let me quickly explain what
I'm trying to do.
I need to store images that are geographically localized. When I
request them, I
request 5 to 10 of those images that are geographically close. My idea
is to have
row keys that are some id of a delimited region and column names that
are the actual
geographic position of the image (the column values are the images data). Each
region (row) will have from 10 to around 10000 image (column) max and
getting my 5-10
images geographically close just amount to a get_slice.
But when I do that, I have bad read performances (4-5 row/sec, that is
50 images max by
seconds and less than that on average). I get better performances by
putting one image by
row. And it makes me really sad as it makes me use cassandra as a
basic key/value store
without using the free sorting. And I want my free sorting :(

Thanks in advance for any explanation/help.

Cheers,
Sylvain

On Tue, Mar 9, 2010 at 3:34 PM, Jonathan Ellis <jbel...@gmail.com> wrote:
> On Tue, Mar 9, 2010 at 8:31 AM, Sylvain Lebresne <sylv...@yakaz.com> wrote:
>> Well, unless I'm mistaking, that's the same in my example as I give in
>> both case
>> to stress.py the option '-c 1' which tells it to retrieve only one
>> column each time
>> even in the case where I have 100 columns by row.
>
> Oh.
>
> Why would you do that? :)
>
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# expects a Cassandra server to be running and listening on port 9160.
# (read tests expect insert tests to have run first too.)

have_multiproc = False
try:
    from multiprocessing import Array as array, Process as Thread
    from uuid import uuid1 as get_ident
    Thread.isAlive = Thread.is_alive
    have_multiproc = True
except ImportError:
    from threading import Thread
    from thread import get_ident
    from array import array
from hashlib import md5
import time, random, sys, os
from random import randint, gauss
from optparse import OptionParser

from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.transport import THttpClient
from thrift.protocol import TBinaryProtocol

try:
    from cassandra import Cassandra
    from Cassandra.ttypes import *
except ImportError:
    # add cassandra directory to sys.path
    L = os.path.abspath(__file__).split(os.path.sep)[:-3]
    root = os.path.sep.join(L)
    _ipath = os.path.join(root, 'interface', 'thrift', 'gen-py')
    sys.path.append(os.path.join(_ipath, 'cassandra'))
    import Cassandra
    from ttypes import *
except ImportError:
    print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'"
    sys.exit(2)


parser = OptionParser()
parser.add_option('-n', '--num-keys', type="int", dest="numkeys",
                  help="Number of keys", default=1000**2)
parser.add_option('-t', '--threads', type="int", dest="threads",
                  help="Number of threads/procs to use", default=50)
parser.add_option('-c', '--columns', type="int", dest="columns",
                  help="Number of columns per key", default=5)
parser.add_option('-d', '--nodes', type="string", dest="nodes",
                  help="Host nodes (comma separated)", default="localhost")
parser.add_option('-s', '--stdev', type="int", dest="stdev", default=0.1,
                  help="standard deviation factor")
parser.add_option('-r', '--random', action="store_true", dest="random",
                  help="use random key generator (stdev will have no effect)")
parser.add_option('-f', '--file', type="string", dest="file", 
                  help="write output to file")
parser.add_option('-p', '--port', type="int", default=9160, dest="port",
                  help="thrift port")
parser.add_option('-m', '--framed', action="store_true", dest="framed",
                  help="use framed transport")
parser.add_option('-o', '--operation', type="choice", dest="operation",
                  default="insert", choices=('insert', 'read', 'rangeslice'),
                  help="operation to perform")
parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1,
                  help="number of super columns per key")
parser.add_option('-y', '--family-type', type="choice", dest="cftype",
                  choices=('regular','super'), default='regular',
                  help="column family type")
parser.add_option('-k', '--keep-going', action="store_true", dest="ignore",
                  help="ignore errors inserting or reading")
parser.add_option('-i', '--progress-interval', type="int", default=10,
                  dest="interval", help="progress report interval (seconds)")
parser.add_option('-g', '--get-range-slice-count', type="int", default=1000,
                  dest="rangecount",
                  help="amount of keys to get_range_slice per call")

(options, args) = parser.parse_args()
 
total_keys = options.numkeys
n_threads = options.threads
keys_per_thread = total_keys / n_threads
columns_per_key = options.columns
supers_per_key = options.supers
# this allows client to round robin requests directly for
# simple request load-balancing
nodes = options.nodes.split(',')

# a generator that generates all keys according to a bell curve centered
# around the middle of the keys generated (0..total_keys).  Remember that
# about 68% of keys will be within stdev away from the mean and 
# about 95% within 2*stdev.
stdev = total_keys * options.stdev
mean = total_keys / 2

def key_generator_gauss():
    fmt = '%0' + str(len(str(total_keys))) + 'd'
    while True:
        guess = gauss(mean, stdev)
        if 0 <= guess < total_keys:
            return fmt % int(guess)
    
# a generator that will generate all keys w/ equal probability.  this is the
# worst case for caching.
def key_generator_random():
    fmt = '%0' + str(len(str(total_keys))) + 'd'
    return fmt % randint(0, total_keys - 1)

key_generator = key_generator_gauss
if options.random:
    key_generator = key_generator_random


def get_client(host='127.0.0.1', port=9160, framed=False):
    socket = TSocket.TSocket(host, port)
    if framed:
        transport = TTransport.TFramedTransport(socket)
    else:
        transport = TTransport.TBufferedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
    client = Cassandra.Client(protocol)
    client.transport = transport
    return client


class Operation(Thread):
    def __init__(self, i, counts, latencies):
        Thread.__init__(self)
        # generator of the keys to be used
        self.range = xrange(keys_per_thread * i, keys_per_thread * (i + 1))
        # we can't use a local counter, since that won't be visible to the parent
        # under multiprocessing.  instead, the parent passes a "counts" array
        # and an index that is our assigned counter.
        self.idx = i
        self.counts = counts
        # similarly, a shared array for latency totals
        self.latencies = latencies
        # random host for pseudo-load-balancing
        [hostname] = random.sample(nodes, 1)
        # open client
        self.cclient = get_client(hostname, options.port, options.framed)
        self.cclient.transport.open()

blob = file('/dev/zero', 'r').read(50000)
print "data size in bytes =", len(blob)

class Inserter(Operation):
    def run(self):
        #data = md5(str(get_ident())).hexdigest()
        data = blob
        columns = [Column(chr(ord('A') + j), data, 0) for j in xrange(columns_per_key)]
        fmt = '%0' + str(len(str(total_keys))) + 'd'
        if 'super' == options.cftype:
            supers = [SuperColumn(chr(ord('A') + j), columns) for j in xrange(supers_per_key)]
        for i in self.range:
            key = fmt % i
            if 'super' == options.cftype:
                cfmap= {'Super1': [ColumnOrSuperColumn(super_column=s) for s in supers]}
            else:
                cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in columns]}
            start = time.time()
            try:
                self.cclient.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
            except KeyboardInterrupt:
                raise
            except Exception, e:
                if options.ignore:
                    print e
                else:
                    raise
            self.latencies[self.idx] += time.time() - start
            self.counts[self.idx] += 1
            time.sleep(1.2)


class Reader(Operation):
    def run(self):
        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
        if 'super' == options.cftype:
            for i in xrange(keys_per_thread):
                key = key_generator()
                for j in xrange(supers_per_key):
                    parent = ColumnParent('Super1', chr(ord('A') + j))
                    start = time.time()
                    try:
                        r = self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
                        if not r: raise RuntimeError("Key %s not found" % key)
                    except KeyboardInterrupt:
                        raise
                    except Exception, e:
                        if options.ignore:
                            print e
                        else:
                            raise
                    self.latencies[self.idx] += time.time() - start
                    self.counts[self.idx] += 1
        else:
            parent = ColumnParent('Standard1')
            for i in xrange(keys_per_thread):
                key = key_generator()
                start = time.time()
                try:
                    r = self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
                    if not r: raise RuntimeError("Key %s not found" % key)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    if options.ignore:
                        print e
                    else:
                        raise
                self.latencies[self.idx] += time.time() - start
                self.counts[self.idx] += 1

class RangeSlicer(Operation):
    def run(self):
        begin = self.range[0]
        end = self.range[-1]
        current = begin
        last = current + options.rangecount
        fmt = '%0' + str(len(str(total_keys))) + 'd'
        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
        if 'super' == options.cftype:
            while current < end:
                start = fmt % current
                finish = fmt % last
                res = []
                for j in xrange(supers_per_key):
                    parent = ColumnParent('Super1', chr(ord('A') + j)) 
                    begin = time.time()
                    try:
                        res = self.cclient.get_range_slice('Keyspace1', parent, p, start,finish, options.rangecount, ConsistencyLevel.ONE)
                        if not res: raise RuntimeError("Key %s not found" % key)
                    except KeyboardInterrupt:
                        raise
                    except Exception, e:
                        if options.ignore:
                            print e
                        else:
                            raise
                    self.latencies[self.idx] += time.time() - begin
                    self.counts[self.idx] += 1
                current += len(r) + 1
                last += len(r)
        else:
            parent = ColumnParent('Standard1')
            while current < end:
                start = fmt % current 
                finish = fmt % last
                begin = time.time()
                try:
                    r = self.cclient.get_range_slice('Keyspace1', parent, p, start, finish, options.rangecount, ConsistencyLevel.ONE)
                    if not r: raise RuntimeError("Range not found:", start, finish)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    if options.ignore:
                        print e
                    else:
                        raise
                current += len(r) + 1
                last += len(r)
                self.latencies[self.idx] += time.time() - begin
                self.counts[self.idx] += 1


class OperationFactory:
    @staticmethod
    def create(type, i, counts, latencies):
        if type == 'read':
            return Reader(i, counts, latencies)
        elif type == 'insert':
            return Inserter(i, counts, latencies)
        elif type == 'rangeslice':
            return RangeSlicer(i, counts, latencies)
        else:
            raise RuntimeError, 'Unsupported op!'


class Stress(object):
    counts = array('i', [0] * n_threads)
    latencies = array('d', [0] * n_threads)

    def create_threads(self,type):
        threads = []
        for i in xrange(n_threads):
            th = OperationFactory.create(type, i, self.counts, self.latencies)
            threads.append(th)
            th.start()
        return threads

    def run_test(self,filename,threads):
        start_t = time.time()
        if filename:
            outf = open(filename,'w')
        else:
            outf = sys.stdout
        outf.write('total,interval_op_rate,avg_latency,elapsed_time\n')
        total = old_total = latency = old_latency = 0
        while True:
            time.sleep(options.interval)
            old_total, old_latency = total, latency
            total = sum(self.counts[th.idx] for th in threads)
            latency = sum(self.latencies[th.idx] for th in threads)
            delta = total - old_total
            delta_latency = latency - old_latency
            delta_formatted = (delta_latency / delta) if delta > 0 else 'NAN'
            elapsed_t = int(time.time() - start_t)
            outf.write('%d,%d,%s,%d\n' 
                       % (total, delta / options.interval, delta_formatted, elapsed_t))
            if not [th for th in threads if th.isAlive()]:
                break

    def insert(self):
        threads = self.create_threads('insert')
        self.run_test(options.file,threads);

    def read(self):
        threads = self.create_threads('read')
        self.run_test(options.file,threads);
        
    def rangeslice(self):
        threads = self.create_threads('rangeslice')
        self.run_test(options.file,threads);

stresser = Stress()
benchmark = getattr(stresser, options.operation, None)
if not have_multiproc:
    print """WARNING: multiprocessing not present, threading will be used.
        Benchmark may not be accurate!"""
benchmark()

Reply via email to