Author: brandonwilliams Date: Wed Jun 1 16:55:27 2011 New Revision: 1130223
URL: http://svn.apache.org/viewvc?rev=1130223&view=rev Log: stress.java daemon mode. Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2267 Added: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd (with props) cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java - copied, changed from r1130201, cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java (with props) cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java (with props) Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Added: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/bin/stressd?rev=1130223&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/bin/stressd (added) +++ cassandra/branches/cassandra-0.8/tools/stress/bin/stressd Wed Jun 1 16:55:27 2011 @@ -0,0 +1,87 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +DESC="Stress Test Daemon" + +if [ "x$CLASSPATH" = "x" ]; then + # Cassandra class files. + if [ ! -d `dirname $0`/../../../build/classes/main ]; then + echo "Unable to locate cassandra class files" >&2 + exit 1 + fi + + # Stress class files. + if [ ! -d `dirname $0`/../build/classes ]; then + echo "Unable to locate stress class files" >&2 + exit 1 + fi + + CLASSPATH=`dirname $0`/../../../build/classes/main + CLASSPATH=$CLASSPATH:`dirname $0`/../../../build/classes/thrift + CLASSPATH=$CLASSPATH:`dirname $0`/../build/classes + for jar in `dirname $0`/../../../lib/*.jar; do + CLASSPATH=$CLASSPATH:$jar + done +fi + +if [ -x $JAVA_HOME/bin/java ]; then + JAVA=$JAVA_HOME/bin/java +else + JAVA=`which java` +fi + +if [ "x$JAVA" = "x" ]; then + echo "Java executable not found (hint: set JAVA_HOME)" >&2 + exit 1 +fi + +case "$1" in + start) + echo "Starting $DESC: " + $JAVA -server -cp $CLASSPATH org.apache.cassandra.stress.StressServer $@ 1> ./stressd.out.log 2> ./stressd.err.log & + echo $! > ./stressd.pid + echo "done." + ;; + + stop) + PID=`cat ./stressd.pid 2> /dev/null` + + if [ "x$PID" = "x" ]; then + echo "$DESC is not running." + else + kill -9 $PID + rm ./stressd.pid + echo "$DESC is stopped." + fi + ;; + + status) + PID=`cat ./stressd.pid 2> /dev/null` + + if [ "x$PID" = "x" ]; then + echo "$DESC is not running." + else + echo "$DESC is running with pid $PID." + fi + ;; + + *) + echo "Usage: $0 start|stop|status [-h <host>]" + ;; +esac + Propchange: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd ------------------------------------------------------------------------------ svn:executable = * Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java Wed Jun 1 16:55:27 2011 @@ -18,6 +18,8 @@ package org.apache.cassandra.stress; import java.io.*; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -34,7 +36,7 @@ import org.apache.thrift.transport.TFram import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; -public class Session +public class Session implements Serializable { // command line options public static final Options availableOptions = new Options(); @@ -74,6 +76,7 @@ public class Session availableOptions.addOption("O", "strategy-properties", true, "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,..."); availableOptions.addOption("W", "no-replicate-on-write",false, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work"); availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size"); + availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address."); } private int numKeys = 1000 * 1000; @@ -95,7 +98,7 @@ public class Session private boolean replicateOnWrite = true; private boolean ignoreErrors = false; - private PrintStream out = System.out; + private final String outFileName; private IndexType indexType = null; private Stress.Operations operation = Stress.Operations.INSERT; @@ -110,6 +113,8 @@ public class Session protected int mean; protected float sigma; + public final InetAddress sendToDaemon; + public Session(String[] arguments) throws IllegalArgumentException { float STDev = 0.1f; @@ -181,17 +186,7 @@ public class Session if (cmd.hasOption("r")) random = true; - if (cmd.hasOption("f")) - { - try - { - out = new PrintStream(new FileOutputStream(cmd.getOptionValue("f"))); - } - catch (FileNotFoundException e) - { - System.out.println(e.getMessage()); - } - } + outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null; if (cmd.hasOption("p")) port = Integer.parseInt(cmd.getOptionValue("p")); @@ -264,6 +259,17 @@ public class Session replicateOnWrite = false; averageSizeValues = cmd.hasOption("V"); + + try + { + sendToDaemon = cmd.hasOption("send-to") + ? InetAddress.getByName(cmd.getOptionValue("send-to")) + : null; + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } } catch (ParseException e) { @@ -360,7 +366,14 @@ public class Session public PrintStream getOutputStream() { - return out; + try + { + return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName)); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e.getMessage(), e); + } } public int getProgressInterval() @@ -432,16 +445,16 @@ public class Session try { client.system_add_keyspace(keyspace); - out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length)); + System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length)); Thread.sleep(nodes.length * 1000); // seconds } catch (InvalidRequestException e) { - out.println("Unable to create stress keyspace: " + e.getWhy()); + System.err.println("Unable to create stress keyspace: " + e.getWhy()); } catch (Exception e) { - out.println(e.getMessage()); + System.err.println(e.getMessage()); } } Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java Wed Jun 1 16:55:27 2011 @@ -17,15 +17,12 @@ */ package org.apache.cassandra.stress; -import org.apache.cassandra.stress.operations.*; -import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.commons.cli.Option; -import java.io.PrintStream; +import java.io.*; +import java.net.Socket; +import java.net.SocketException; import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.SynchronousQueue; public final class Stress { @@ -36,17 +33,10 @@ public final class Stress public static Session session; public static Random randomizer = new Random(); - - /** - * Producer-Consumer model: 1 producer, N consumers - */ - private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true); + private static volatile boolean stopped = false; public static void main(String[] arguments) throws Exception { - long latency, oldLatency; - int epoch, total, oldTotal, keyCount, oldKeyCount; - try { session = new Session(arguments); @@ -57,111 +47,49 @@ public final class Stress return; } - // creating keyspace and column families - if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD) - { - session.createKeySpaces(); - } - - int threadCount = session.getThreads(); - Thread[] consumers = new Thread[threadCount]; - PrintStream out = session.getOutputStream(); - - out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time"); + PrintStream outStream = session.getOutputStream(); - int itemsPerThread = session.getKeysPerThread(); - int modulo = session.getNumKeys() % threadCount; - - // creating required type of the threads for the test - for (int i = 0; i < threadCount; i++) + if (session.sendToDaemon != null) { - if (i == threadCount - 1) - itemsPerThread += modulo; // last one is going to handle N + modulo items - - consumers[i] = new Consumer(itemsPerThread); - } - - new Producer().start(); - - // starting worker threads - for (int i = 0; i < threadCount; i++) - { - consumers[i].start(); - } + Socket socket = new Socket(session.sendToDaemon, 2159); - // initialization of the values - boolean terminate = false; - latency = 0; - epoch = total = keyCount = 0; + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); + BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream())); - int interval = session.getProgressInterval(); - int epochIntervals = session.getProgressInterval() * 10; - long testStartTime = System.currentTimeMillis(); + Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out)); - while (!terminate) - { - Thread.sleep(100); - - int alive = 0; - for (Thread thread : consumers) - if (thread.isAlive()) alive++; + out.writeObject(session); - if (alive == 0) - terminate = true; + String line; - epoch++; - - if (terminate || epoch > epochIntervals) + try { - epoch = 0; - - oldTotal = total; - oldLatency = latency; - oldKeyCount = keyCount; - - total = session.operations.get(); - keyCount = session.keys.get(); - latency = session.latency.get(); + while (!socket.isClosed() && (line = inp.readLine()) != null) + { + if (line.equals("END")) + { + out.writeInt(1); + break; + } - int opDelta = total - oldTotal; - int keyDelta = keyCount - oldKeyCount; - double latencyDelta = latency - oldLatency; + outStream.println(line); + } + } + catch (SocketException e) + { + if (!stopped) + e.printStackTrace(); + } - long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000; - String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN"; + out.close(); + inp.close(); - out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds)); - } + socket.close(); } - } - - private static Operation createOperation(int index) - { - switch (session.getOperation()) + else { - case READ: - return new Reader(index); - - case COUNTER_GET: - return new CounterGetter(index); - - case INSERT: - return new Inserter(index); - - case COUNTER_ADD: - return new CounterAdder(index); - - case RANGE_SLICE: - return new RangeSlicer(index); - - case INDEXED_RANGE_SLICE: - return new IndexedRangeSlicer(index); - - case MULTI_GET: - return new MultiGetter(index); + new StressAction(session, outStream).run(); } - - throw new UnsupportedOperationException(); } /** @@ -180,56 +108,35 @@ public final class Stress } } - /** - * Produces exactly N items (awaits each to be consumed) - */ - private static class Producer extends Thread + private static class ShutDown extends Thread { - public void run() - { - for (int i = 0; i < session.getNumKeys(); i++) - { - try - { - operations.put(createOperation(i % session.getNumDifferentKeys())); - } - catch (InterruptedException e) - { - System.err.println("Producer error - " + e.getMessage()); - return; - } - } - } - } - - /** - * Each consumes exactly N items from queue - */ - private static class Consumer extends Thread - { - private final int items; + private final Socket socket; + private final ObjectOutputStream out; - public Consumer(int toConsume) + public ShutDown(Socket socket, ObjectOutputStream out) { - items = toConsume; + this.out = out; + this.socket = socket; } public void run() { - Cassandra.Client client = session.getClient(); - - for (int i = 0; i < items; i++) + try { - try - { - operations.take().run(client); // running job - } - catch (Exception e) + if (!socket.isClosed()) { - System.err.println(e.getMessage()); - System.exit(-1); + System.out.println("Control-C caught. Canceling running action and shutting down..."); + + out.writeInt(1); + out.close(); + + stopped = true; } } + catch (IOException e) + { + e.printStackTrace(); + } } } Copied: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java (from r1130201, cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java) URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java?p2=cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java&p1=cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java&r1=1130201&r2=1130223&rev=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java Wed Jun 1 16:55:27 2011 @@ -17,90 +17,93 @@ */ package org.apache.cassandra.stress; -import org.apache.cassandra.stress.operations.*; -import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.commons.cli.Option; - import java.io.PrintStream; -import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; -public final class Stress -{ - public static enum Operations - { - INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET - } - - public static Session session; - public static Random randomizer = new Random(); +import org.apache.cassandra.stress.operations.*; +import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.thrift.Cassandra; +public class StressAction extends Thread +{ /** * Producer-Consumer model: 1 producer, N consumers */ - private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true); + private final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true); + + private final Session client; + private final PrintStream output; - public static void main(String[] arguments) throws Exception + private volatile boolean stop = false; + + public StressAction(Session session, PrintStream out) + { + client = session; + output = out; + } + + public void run() { long latency, oldLatency; int epoch, total, oldTotal, keyCount, oldKeyCount; - try - { - session = new Session(arguments); - } - catch (IllegalArgumentException e) - { - printHelpMessage(); - return; - } - // creating keyspace and column families - if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD) - { - session.createKeySpaces(); - } + if (client.getOperation() == Stress.Operations.INSERT || client.getOperation() == Stress.Operations.COUNTER_ADD) + client.createKeySpaces(); - int threadCount = session.getThreads(); - Thread[] consumers = new Thread[threadCount]; - PrintStream out = session.getOutputStream(); + int threadCount = client.getThreads(); + Consumer[] consumers = new Consumer[threadCount]; - out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time"); + output.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time"); - int itemsPerThread = session.getKeysPerThread(); - int modulo = session.getNumKeys() % threadCount; + int itemsPerThread = client.getKeysPerThread(); + int modulo = client.getNumKeys() % threadCount; // creating required type of the threads for the test - for (int i = 0; i < threadCount; i++) - { + for (int i = 0; i < threadCount; i++) { if (i == threadCount - 1) itemsPerThread += modulo; // last one is going to handle N + modulo items consumers[i] = new Consumer(itemsPerThread); } - new Producer().start(); + Producer producer = new Producer(); + producer.start(); // starting worker threads for (int i = 0; i < threadCount; i++) - { consumers[i].start(); - } // initialization of the values boolean terminate = false; latency = 0; epoch = total = keyCount = 0; - int interval = session.getProgressInterval(); - int epochIntervals = session.getProgressInterval() * 10; + int interval = client.getProgressInterval(); + int epochIntervals = client.getProgressInterval() * 10; long testStartTime = System.currentTimeMillis(); while (!terminate) { - Thread.sleep(100); + if (stop) + { + producer.stopProducer(); + + for (Consumer consumer : consumers) + consumer.stopConsume(); + + break; + } + + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + throw new RuntimeException(e.getMessage(), e); + } int alive = 0; for (Thread thread : consumers) @@ -115,83 +118,46 @@ public final class Stress { epoch = 0; - oldTotal = total; - oldLatency = latency; + oldTotal = total; + oldLatency = latency; oldKeyCount = keyCount; - total = session.operations.get(); - keyCount = session.keys.get(); - latency = session.latency.get(); + total = client.operations.get(); + keyCount = client.keys.get(); + latency = client.latency.get(); - int opDelta = total - oldTotal; + int opDelta = total - oldTotal; int keyDelta = keyCount - oldKeyCount; double latencyDelta = latency - oldLatency; long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000; String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN"; - out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds)); + output.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds)); } } - } - - private static Operation createOperation(int index) - { - switch (session.getOperation()) - { - case READ: - return new Reader(index); - - case COUNTER_GET: - return new CounterGetter(index); - - case INSERT: - return new Inserter(index); - case COUNTER_ADD: - return new CounterAdder(index); - - case RANGE_SLICE: - return new RangeSlicer(index); - - case INDEXED_RANGE_SLICE: - return new IndexedRangeSlicer(index); - - case MULTI_GET: - return new MultiGetter(index); - } - - throw new UnsupportedOperationException(); - } - - /** - * Printing out help message - */ - public static void printHelpMessage() - { - System.out.println("Usage: ./bin/stress [options]\n\nOptions:"); - - for(Object o : Session.availableOptions.getOptions()) - { - Option option = (Option) o; - String upperCaseName = option.getLongOpt().toUpperCase(); - System.out.println(String.format("-%s%s, --%s%s%n\t\t%s%n", option.getOpt(), (option.hasArg()) ? " "+upperCaseName : "", - option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription())); - } + // marking an end of the output to the client + output.println("END"); } /** * Produces exactly N items (awaits each to be consumed) */ - private static class Producer extends Thread + private class Producer extends Thread { + private volatile boolean stop = false; + public void run() { - for (int i = 0; i < session.getNumKeys(); i++) + for (int i = 0; i < client.getNumKeys(); i++) { + if (stop) + break; + try { - operations.put(createOperation(i % session.getNumDifferentKeys())); + operations.put(createOperation(i % client.getNumDifferentKeys())); } catch (InterruptedException e) { @@ -200,14 +166,20 @@ public final class Stress } } } + + public void stopProducer() + { + stop = true; + } } /** * Each consumes exactly N items from queue */ - private static class Consumer extends Thread + private class Consumer extends Thread { private final int items; + private volatile boolean stop = false; public Consumer(int toConsume) { @@ -216,21 +188,69 @@ public final class Stress public void run() { - Cassandra.Client client = session.getClient(); + Cassandra.Client connection = client.getClient(); for (int i = 0; i < items; i++) { + if (stop) + break; + try { - operations.take().run(client); // running job + operations.take().run(connection); // running job } catch (Exception e) { - System.err.println(e.getMessage()); - System.exit(-1); + if (output == null) + { + System.err.println(e.getMessage()); + System.exit(-1); + } + + + output.println(e.getMessage()); + break; } } } + + public void stopConsume() + { + stop = true; + } + } + + private Operation createOperation(int index) + { + switch (client.getOperation()) + { + case READ: + return new Reader(client, index); + + case COUNTER_GET: + return new CounterGetter(client, index); + + case INSERT: + return new Inserter(client, index); + + case COUNTER_ADD: + return new CounterAdder(client, index); + + case RANGE_SLICE: + return new RangeSlicer(client, index); + + case INDEXED_RANGE_SLICE: + return new IndexedRangeSlicer(client, index); + + case MULTI_GET: + return new MultiGetter(client, index); + } + + throw new UnsupportedOperationException(); } + public void stopAction() + { + stop = true; + } } Added: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java?rev=1130223&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java (added) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java Wed Jun 1 16:55:27 2011 @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.stress; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; + +import org.apache.cassandra.stress.server.StressThread; +import org.apache.commons.cli.*; + +public class StressServer +{ + private static final Options availableOptions = new Options(); + + static + { + availableOptions.addOption("h", "host", true, "Host to listen for connections."); + } + + public static void main(String[] args) throws Exception + { + ServerSocket serverSocket = null; + CommandLineParser parser = new PosixParser(); + + InetAddress address = InetAddress.getByName("127.0.0.1"); + + try + { + CommandLine cmd = parser.parse(availableOptions, args); + + if (cmd.hasOption("h")) + { + address = InetAddress.getByName(cmd.getOptionValue("h")); + } + } + catch (ParseException e) + { + System.err.printf("Usage: ./bin/stressd start|stop|status [-h <host>]"); + System.exit(1); + } + + try + { + serverSocket = new ServerSocket(2159, 0, address); + } + catch (IOException e) + { + System.err.printf("Could not listen on port: %s:2159.%n", address.getHostAddress()); + System.exit(1); + } + + for (;;) + new StressThread(serverSocket.accept()).start(); + } +} Propchange: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -31,9 +32,9 @@ import java.util.Map; public class CounterAdder extends Operation { - public CounterAdder(int index) + public CounterAdder(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -27,9 +28,9 @@ import java.util.List; public class CounterGetter extends Operation { - public CounterGetter(int index) + public CounterGetter(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; @@ -30,9 +31,9 @@ public class IndexedRangeSlicer extends { private static List<ByteBuffer> values = null; - public IndexedRangeSlicer(int index) + public IndexedRangeSlicer(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -33,9 +34,9 @@ public class Inserter extends Operation { private static List<ByteBuffer> values; - public Inserter(int index) + public Inserter(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -31,9 +32,9 @@ import java.util.Map; public class MultiGetter extends Operation { - public MultiGetter(int index) + public MultiGetter(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -30,9 +31,9 @@ import java.util.List; public class RangeSlicer extends Operation { - public RangeSlicer(int index) + public RangeSlicer(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Wed Jun 1 16:55:27 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -29,9 +30,9 @@ import static com.google.common.base.Cha public class Reader extends Operation { - public Reader(int index) + public Reader(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Added: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java?rev=1130223&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java (added) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java Wed Jun 1 16:55:27 2011 @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.stress.server; + +import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.StressAction; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.PrintStream; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +public class StressThread extends Thread +{ + private final Socket socket; + + public StressThread(Socket client) + { + this.socket = client; + } + + public void run() + { + try + { + ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); + PrintStream out = new PrintStream(socket.getOutputStream()); + + StressAction action = new StressAction((Session) in.readObject(), out); + action.start(); + + while (action.isAlive()) + { + try + { + if (in.readInt() == 1) + { + action.stopAction(); + break; + } + } + catch (Exception e) + { + // continue without problem + } + } + + out.close(); + in.close(); + socket.close(); + } + catch (IOException e) + { + throw new RuntimeException(e.getMessage(), e); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + +} Propchange: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java?rev=1130223&r1=1130222&r2=1130223&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java (original) +++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Wed Jun 1 16:55:27 2011 @@ -46,6 +46,12 @@ public abstract class Operation session = Stress.session; } + public Operation(Session client, int idx) + { + index = idx; + session = client; + } + /** * Run operation * @param client Cassandra Thrift client connection @@ -101,18 +107,18 @@ public abstract class Operation * key generator using Gauss or Random algorithm * @return byte[] representation of the key string */ - protected static byte[] generateKey() + protected byte[] generateKey() { - return (Stress.session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey(); + return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey(); } /** * Random key generator * @return byte[] representation of the key string */ - private static byte[] generateRandomKey() + private byte[] generateRandomKey() { - String format = "%0" + Stress.session.getTotalKeysLength() + "d"; + String format = "%0" + session.getTotalKeysLength() + "d"; return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8); } @@ -120,9 +126,8 @@ public abstract class Operation * Gauss key generator * @return byte[] representation of the key string */ - private static byte[] generateGaussKey() + private byte[] generateGaussKey() { - Session session = Stress.session; String format = "%0" + session.getTotalKeysLength() + "d"; for (;;)