Updated Branches: refs/heads/trunk 34235ad7b -> 2e1e98ad0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java new file mode 100644 index 0000000..99fa452 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java @@ -0,0 +1,235 @@ +package org.apache.cassandra.stress.util; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import org.apache.cassandra.stress.settings.StressSettings; +import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.thrift.TException; + +public class SmartThriftClient implements ThriftClient +{ + + final String keyspace; + final Metadata metadata; + final StressSettings settings; + final ConcurrentHashMap<Host, ConcurrentLinkedQueue<Client>> cache = new ConcurrentHashMap<>(); + + final AtomicInteger queryIdCounter = new AtomicInteger(); + final ConcurrentHashMap<Integer, String> queryStrings = new ConcurrentHashMap<>(); + final ConcurrentHashMap<String, Integer> queryIds = new ConcurrentHashMap<>(); + + public SmartThriftClient(StressSettings settings, String keyspace, Metadata metadata) + { + this.metadata = metadata; + this.keyspace = keyspace; + this.settings = settings; + } + + private final AtomicInteger roundrobin = new AtomicInteger(); + + private Integer getId(String query) + { + Integer r; + if ((r = queryIds.get(query)) != null) + return r; + r = queryIdCounter.incrementAndGet(); + if (queryIds.putIfAbsent(query, r) == null) + return r; + queryStrings.put(r, query); + return queryIds.get(query); + } + + final class Client + { + final Cassandra.Client client; + final Host host; + final Map<Integer, Integer> queryMap = new HashMap<>(); + + Client(Cassandra.Client client, Host host) + { + this.client = client; + this.host = host; + } + + Integer get(Integer id, boolean cql3) throws TException + { + Integer serverId = queryMap.get(id); + if (serverId != null) + return serverId; + prepare(id, cql3); + return queryMap.get(id); + } + + void prepare(Integer id, boolean cql3) throws TException + { + String query; + while ( null == (query = queryStrings.get(id)) ) ; + if (cql3) + { + Integer serverId = client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE).itemId; + queryMap.put(id, serverId); + } + else + { + Integer serverId = client.prepare_cql_query(ByteBufferUtil.bytes(query), Compression.NONE).itemId; + queryMap.put(id, serverId); + } + } + } + + private Client get(ByteBuffer pk) + { + Set<Host> hosts = metadata.getReplicas(keyspace, pk); + int count = roundrobin.incrementAndGet() % hosts.size(); + if (count < 0) + count = -count; + Iterator<Host> iter = hosts.iterator(); + while (count > 0 && iter.hasNext()) + iter.next(); + Host host = iter.next(); + ConcurrentLinkedQueue<Client> q = cache.get(host); + if (q == null) + { + ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>(); + q = cache.putIfAbsent(host, newQ); + if (q == null) + q = newQ; + } + Client tclient = q.poll(); + if (tclient != null) + return tclient; + return new Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host); + } + + @Override + public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> record, ConsistencyLevel consistencyLevel) throws TException + { + for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> e : record.entrySet()) + { + Client client = get(e.getKey()); + try + { + client.client.batch_mutate(Collections.singletonMap(e.getKey(), e.getValue()), consistencyLevel); + } finally + { + cache.get(client.host).add(client); + } + } + } + + @Override + public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent parent, SlicePredicate predicate, ConsistencyLevel consistencyLevel) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + Client client = get(key); + try + { + return client.client.get_slice(key, parent, predicate, consistencyLevel); + } finally + { + cache.get(client.host).add(client); + } + } + + @Override + public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + Client client = get(key); + try + { + client.client.insert(key, column_parent, column, consistency_level); + } finally + { + cache.get(client.host).add(client); + } + } + + @Override + public CqlResult execute_cql_query(String query, ByteBuffer key, Compression compression) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + { + Client client = get(key); + try + { + return client.client.execute_cql_query(ByteBufferUtil.bytes(query), compression); + } finally + { + cache.get(client.host).add(client); + } + } + + @Override + public CqlResult execute_cql3_query(String query, ByteBuffer key, Compression compression, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + { + Client client = get(key); + try + { + return client.client.execute_cql3_query(ByteBufferUtil.bytes(query), compression, consistency); + } finally + { + cache.get(client.host).add(client); + } + } + + @Override + public Integer prepare_cql3_query(String query, Compression compression) throws InvalidRequestException, TException + { + return getId(query); + } + + @Override + public CqlResult execute_prepared_cql3_query(int queryId, ByteBuffer key, List<ByteBuffer> values, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + { + Client client = get(key); + try + { + return client.client.execute_prepared_cql3_query(client.get(queryId, true), values, consistency); + } finally + { + cache.get(client.host).add(client); + } + } + + @Override + public Integer prepare_cql_query(String query, Compression compression) throws InvalidRequestException, TException + { + return getId(query); + } + + @Override + public CqlResult execute_prepared_cql_query(int queryId, ByteBuffer key, List<ByteBuffer> values) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + { + Client client = get(key); + try + { + return client.client.execute_prepared_cql_query(client.get(queryId, true), values); + } finally + { + cache.get(client.host).add(client); + } + } + + @Override + public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + throw new UnsupportedOperationException(); + } + + @Override + public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + throw new UnsupportedOperationException(); + } + + @Override + public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java b/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java new file mode 100644 index 0000000..1ceca29 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java @@ -0,0 +1,36 @@ +package org.apache.cassandra.stress.util; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.thrift.*; +import org.apache.thrift.TException; + +public interface ThriftClient +{ + + public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> record, ConsistencyLevel consistencyLevel) throws TException; + + List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent parent, SlicePredicate predicate, ConsistencyLevel consistencyLevel) throws InvalidRequestException, UnavailableException, TimedOutException, TException; + + void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException; + + Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException; + + List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException; + + List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException; + + Integer prepare_cql3_query(String query, Compression compression) throws InvalidRequestException, TException; + + CqlResult execute_prepared_cql3_query(int itemId, ByteBuffer key, List<ByteBuffer> values, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException; + + CqlResult execute_cql_query(String query, ByteBuffer key, Compression compression) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException; + + CqlResult execute_cql3_query(String query, ByteBuffer key, Compression compression, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException; + + Integer prepare_cql_query(String query, Compression compression) throws InvalidRequestException, TException; + + CqlResult execute_prepared_cql_query(int itemId, ByteBuffer key, List<ByteBuffer> values) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Timer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java new file mode 100644 index 0000000..c216561 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java @@ -0,0 +1,129 @@ +package org.apache.cassandra.stress.util; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +// a timer - this timer must be used by a single thread, and co-ordinates with other timers by +public final class Timer +{ + + private static final int SAMPLE_SIZE_SHIFT = 10; + private static final int SAMPLE_SIZE_MASK = (1 << SAMPLE_SIZE_SHIFT) - 1; + + private final Random rnd = new Random(); + + // in progress snap start + private long sampleStartNanos; + + // each entry is present with probability 1/p(opCount) or 1/(p(opCount)-1) + private final long[] sample = new long[1 << SAMPLE_SIZE_SHIFT]; + private int opCount; + + // aggregate info + private int keyCount; + private long total; + private long max; + private long maxStart; + private long upToDateAsOf; + private long lastSnap = System.nanoTime(); + + // communication with summary/logging thread + private volatile CountDownLatch reportRequest; + volatile TimingInterval report; + private volatile TimingInterval finalReport; + + public void start(){ + // decide if we're logging this event + sampleStartNanos = System.nanoTime(); + } + + private static int p(int index) + { + return 1 + (index >>> SAMPLE_SIZE_SHIFT); + } + + public void stop(int keys) + { + maybeReport(); + long now = System.nanoTime(); + long time = now - sampleStartNanos; + if (rnd.nextInt(p(opCount)) == 0) + sample[index(opCount)] = time; + if (time > max) + { + maxStart = sampleStartNanos; + max = time; + } + total += time; + opCount += 1; + keyCount += keys; + upToDateAsOf = now; + } + + private static int index(int count) + { + return count & SAMPLE_SIZE_MASK; + } + + private TimingInterval buildReport() + { + final List<SampleOfLongs> sampleLatencies = Arrays.asList + ( new SampleOfLongs(Arrays.copyOf(sample, index(opCount)), p(opCount)), + new SampleOfLongs(Arrays.copyOfRange(sample, index(opCount), Math.min(opCount, sample.length)), p(opCount) - 1) + ); + final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, keyCount, total, opCount, + SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE)); + // reset counters + opCount = 0; + keyCount = 0; + total = 0; + max = 0; + lastSnap = upToDateAsOf; + return report; + } + + // checks to see if a report has been requested, and if so produces the report, signals and clears the request + private void maybeReport() + { + if (reportRequest != null) + { + synchronized (this) + { + report = buildReport(); + reportRequest.countDown(); + reportRequest = null; + } + } + } + + // checks to see if the timer is dead; if not requests a report, and otherwise fulfills the request itself + synchronized void requestReport(CountDownLatch signal) + { + if (finalReport != null) + { + report = finalReport; + finalReport = new TimingInterval(0); + signal.countDown(); + } + else + reportRequest = signal; + } + + // closes the timer; if a request is outstanding, it furnishes the request, otherwise it populates finalReport + public synchronized void close() + { + if (reportRequest == null) + finalReport = buildReport(); + else + { + finalReport = new TimingInterval(0); + report = buildReport(); + reportRequest.countDown(); + reportRequest = null; + } + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Timing.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java new file mode 100644 index 0000000..6f5052f --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java @@ -0,0 +1,72 @@ +package org.apache.cassandra.stress.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +// relatively simple timing class for getting a uniform sample of latencies, and saving other metrics +// ensures accuracy of timing by having single threaded timers that are check-pointed by the snapping thread, +// which waits for them to report back. They report back the data up to the last event prior to the check-point. +// if the threads are blocked/paused this may mean a period of time longer than the checkpoint elapses, but that all +// metrics calculated over the interval are accurate +public class Timing +{ + + private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>(); + private volatile TimingInterval history; + private final Random rnd = new Random(); + + // TIMING + + private TimingInterval snapInterval(Random rnd) throws InterruptedException + { + final Timer[] timers = this.timers.toArray(new Timer[0]); + final CountDownLatch ready = new CountDownLatch(timers.length); + for (int i = 0 ; i < timers.length ; i++) + { + final Timer timer = timers[i]; + timer.requestReport(ready); + } + + // TODO fail gracefully after timeout if a thread is stuck + if (!ready.await(2L, TimeUnit.MINUTES)) + throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck"); + + // reports have been filled in by timer threadCount, so merge + List<TimingInterval> intervals = new ArrayList<>(); + for (Timer timer : timers) + intervals.add(timer.report); + + return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, history.endNanos()); + } + + // build a new timer and add it to the set of running timers + public Timer newTimer() + { + final Timer timer = new Timer(); + timers.add(timer); + return timer; + } + + public void start() + { + history = new TimingInterval(System.nanoTime()); + } + + public TimingInterval snapInterval() throws InterruptedException + { + final TimingInterval interval = snapInterval(rnd); + history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 50000, history.startNanos()); + return interval; + } + + public TimingInterval getHistory() + { + return history; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java new file mode 100644 index 0000000..04fb044 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java @@ -0,0 +1,132 @@ +package org.apache.cassandra.stress.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +// represents measurements taken over an interval of time +// used for both single timer results and merged timer results +public final class TimingInterval +{ + // nanos + private final long start; + private final long end; + public final long maxLatency; + public final long pauseLength; + public final long pauseStart; + public final long totalLatency; + + // discrete + public final long keyCount; + public final long operationCount; + + final SampleOfLongs sample; + + TimingInterval(long time) + { + start = end = time; + maxLatency = totalLatency = 0; + keyCount = operationCount = 0; + pauseStart = pauseLength = 0; + sample = new SampleOfLongs(new long[0], 1d); + } + TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long keyCount, long totalLatency, long operationCount, SampleOfLongs sample) + { + this.start = start; + this.end = Math.max(end, start); + this.maxLatency = maxLatency; + this.keyCount = keyCount; + this.totalLatency = totalLatency; + this.operationCount = operationCount; + this.pauseStart = pauseStart; + this.pauseLength = pauseLength; + this.sample = sample; + } + + // merge multiple timer intervals together + static TimingInterval merge(Random rnd, List<TimingInterval> intervals, int maxSamples, long start) + { + int operationCount = 0, keyCount = 0; + long maxLatency = 0, totalLatency = 0; + List<SampleOfLongs> latencies = new ArrayList<>(); + long end = 0; + long pauseStart = 0, pauseEnd = Long.MAX_VALUE; + for (TimingInterval interval : intervals) + { + end = Math.max(end, interval.end); + operationCount += interval.operationCount; + maxLatency = Math.max(interval.maxLatency, maxLatency); + totalLatency += interval.totalLatency; + keyCount += interval.keyCount; + latencies.addAll(Arrays.asList(interval.sample)); + if (interval.pauseLength > 0) + { + pauseStart = Math.max(pauseStart, interval.pauseStart); + pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength); + } + } + if (pauseEnd < pauseStart) + pauseEnd = pauseStart = 0; + return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, keyCount, totalLatency, operationCount, + SampleOfLongs.merge(rnd, latencies, maxSamples)); + + } + + public double realOpRate() + { + return operationCount / ((end - start) * 0.000000001d); + } + + public double adjustedOpRate() + { + return operationCount / ((end - (start + pauseLength)) * 0.000000001d); + } + + public double keyRate() + { + return keyCount / ((end - start) * 0.000000001d); + } + + public double meanLatency() + { + return (totalLatency / (double) operationCount) * 0.000001d; + } + + public double maxLatency() + { + return maxLatency * 0.000001d; + } + + public long runTime() + { + return (end - start) / 1000000; + } + + public double medianLatency() + { + return sample.medianLatency(); + } + + // 0 < rank < 1 + public double rankLatency(float rank) + { + return sample.rankLatency(rank); + } + + public final long endNanos() + { + return end; + } + + public final long endMillis() + { + return end / 1000000; + } + + public long startNanos() + { + return start; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java b/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java new file mode 100644 index 0000000..ac2d803 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java @@ -0,0 +1,81 @@ +package org.apache.cassandra.stress.util; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +// TODO: do not assume normal distribution of measurements. +public class Uncertainty +{ + + private int measurements; + private double sumsquares; + private double sum; + private double stdev; + private double mean; + private double uncertainty; + + private CopyOnWriteArrayList<WaitForTargetUncertainty> waiting = new CopyOnWriteArrayList<>(); + + private static final class WaitForTargetUncertainty + { + final double targetUncertainty; + final int minMeasurements; + final int maxMeasurements; + final CountDownLatch latch = new CountDownLatch(1); + + private WaitForTargetUncertainty(double targetUncertainty, int minMeasurements, int maxMeasurements) + { + this.targetUncertainty = targetUncertainty; + this.minMeasurements = minMeasurements; + this.maxMeasurements = maxMeasurements; + } + + void await() throws InterruptedException + { + latch.await(); + } + + } + + public void update(double value) + { + measurements++; + sumsquares += value * value; + sum += value; + mean = sum / measurements; + stdev = Math.sqrt((sumsquares / measurements) - (mean * mean)); + uncertainty = (stdev / Math.sqrt(measurements)) / mean; + + for (WaitForTargetUncertainty waiter : waiting) + { + if ((uncertainty < waiter.targetUncertainty && measurements >= waiter.minMeasurements) || (measurements >= waiter.maxMeasurements)) + { + waiter.latch.countDown(); + // can safely remove as working over snapshot with COWArrayList + waiting.remove(waiter); + } + } + } + + public void await(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException + { + final WaitForTargetUncertainty wait = new WaitForTargetUncertainty(targetUncertainty, minMeasurements, maxMeasurements); + waiting.add(wait); + wait.await(); + } + + public double getUncertainty() + { + return uncertainty; + } + + public void wakeAll() + { + for (WaitForTargetUncertainty waiting : this.waiting) + { + waiting.latch.countDown(); + this.waiting.remove(waiting); + } + } + +}