Updated Branches:
  refs/heads/trunk 34235ad7b -> 2e1e98ad0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java 
b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
new file mode 100644
index 0000000..99fa452
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
@@ -0,0 +1,235 @@
+package org.apache.cassandra.stress.util;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public class SmartThriftClient implements ThriftClient
+{
+
+    final String keyspace;
+    final Metadata metadata;
+    final StressSettings settings;
+    final ConcurrentHashMap<Host, ConcurrentLinkedQueue<Client>> cache = new 
ConcurrentHashMap<>();
+
+    final AtomicInteger queryIdCounter = new AtomicInteger();
+    final ConcurrentHashMap<Integer, String> queryStrings = new 
ConcurrentHashMap<>();
+    final ConcurrentHashMap<String, Integer> queryIds = new 
ConcurrentHashMap<>();
+
+    public SmartThriftClient(StressSettings settings, String keyspace, 
Metadata metadata)
+    {
+        this.metadata = metadata;
+        this.keyspace = keyspace;
+        this.settings = settings;
+    }
+
+    private final AtomicInteger roundrobin = new AtomicInteger();
+
+    private Integer getId(String query)
+    {
+        Integer r;
+        if ((r = queryIds.get(query)) != null)
+            return r;
+        r = queryIdCounter.incrementAndGet();
+        if (queryIds.putIfAbsent(query, r) == null)
+            return r;
+        queryStrings.put(r, query);
+        return queryIds.get(query);
+    }
+
+    final class Client
+    {
+        final Cassandra.Client client;
+        final Host host;
+        final Map<Integer, Integer> queryMap = new HashMap<>();
+
+        Client(Cassandra.Client client, Host host)
+        {
+            this.client = client;
+            this.host = host;
+        }
+
+        Integer get(Integer id, boolean cql3) throws TException
+        {
+            Integer serverId = queryMap.get(id);
+            if (serverId != null)
+                return serverId;
+            prepare(id, cql3);
+            return queryMap.get(id);
+        }
+
+       void prepare(Integer id, boolean cql3) throws TException
+       {
+           String query;
+           while ( null == (query = queryStrings.get(id)) ) ;
+           if (cql3)
+           {
+               Integer serverId = 
client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE).itemId;
+               queryMap.put(id, serverId);
+           }
+           else
+           {
+               Integer serverId = 
client.prepare_cql_query(ByteBufferUtil.bytes(query), Compression.NONE).itemId;
+               queryMap.put(id, serverId);
+           }
+       }
+    }
+
+    private Client get(ByteBuffer pk)
+    {
+        Set<Host> hosts = metadata.getReplicas(keyspace, pk);
+        int count = roundrobin.incrementAndGet() % hosts.size();
+        if (count < 0)
+            count = -count;
+        Iterator<Host> iter = hosts.iterator();
+        while (count > 0 && iter.hasNext())
+            iter.next();
+        Host host = iter.next();
+        ConcurrentLinkedQueue<Client> q = cache.get(host);
+        if (q == null)
+        {
+            ConcurrentLinkedQueue<Client> newQ = new 
ConcurrentLinkedQueue<Client>();
+            q = cache.putIfAbsent(host, newQ);
+            if (q == null)
+                q = newQ;
+        }
+        Client tclient = q.poll();
+        if (tclient != null)
+            return tclient;
+        return new 
Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host);
+    }
+
+    @Override
+    public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> 
record, ConsistencyLevel consistencyLevel) throws TException
+    {
+        for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> e : 
record.entrySet())
+        {
+            Client client = get(e.getKey());
+            try
+            {
+                
client.client.batch_mutate(Collections.singletonMap(e.getKey(), e.getValue()), 
consistencyLevel);
+            } finally
+            {
+                cache.get(client.host).add(client);
+            }
+        }
+    }
+
+    @Override
+    public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent 
parent, SlicePredicate predicate, ConsistencyLevel consistencyLevel) throws 
InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        Client client = get(key);
+        try
+        {
+            return client.client.get_slice(key, parent, predicate, 
consistencyLevel);
+        } finally
+        {
+            cache.get(client.host).add(client);
+        }
+    }
+
+    @Override
+    public void insert(ByteBuffer key, ColumnParent column_parent, Column 
column, ConsistencyLevel consistency_level) throws InvalidRequestException, 
UnavailableException, TimedOutException, TException
+    {
+        Client client = get(key);
+        try
+        {
+            client.client.insert(key, column_parent, column, 
consistency_level);
+        } finally
+        {
+            cache.get(client.host).add(client);
+        }
+    }
+
+    @Override
+    public CqlResult execute_cql_query(String query, ByteBuffer key, 
Compression compression) throws InvalidRequestException, UnavailableException, 
TimedOutException, SchemaDisagreementException, TException
+    {
+        Client client = get(key);
+        try
+        {
+            return 
client.client.execute_cql_query(ByteBufferUtil.bytes(query), compression);
+        } finally
+        {
+            cache.get(client.host).add(client);
+        }
+    }
+
+    @Override
+    public CqlResult execute_cql3_query(String query, ByteBuffer key, 
Compression compression, ConsistencyLevel consistency) throws 
InvalidRequestException, UnavailableException, TimedOutException, 
SchemaDisagreementException, TException
+    {
+        Client client = get(key);
+        try
+        {
+            return 
client.client.execute_cql3_query(ByteBufferUtil.bytes(query), compression, 
consistency);
+        } finally
+        {
+            cache.get(client.host).add(client);
+        }
+    }
+
+    @Override
+    public Integer prepare_cql3_query(String query, Compression compression) 
throws InvalidRequestException, TException
+    {
+        return getId(query);
+    }
+
+    @Override
+    public CqlResult execute_prepared_cql3_query(int queryId, ByteBuffer key, 
List<ByteBuffer> values, ConsistencyLevel consistency) throws 
InvalidRequestException, UnavailableException, TimedOutException, 
SchemaDisagreementException, TException
+    {
+        Client client = get(key);
+        try
+        {
+            return 
client.client.execute_prepared_cql3_query(client.get(queryId, true), values, 
consistency);
+        } finally
+        {
+            cache.get(client.host).add(client);
+        }
+    }
+
+    @Override
+    public Integer prepare_cql_query(String query, Compression compression) 
throws InvalidRequestException, TException
+    {
+        return getId(query);
+    }
+
+    @Override
+    public CqlResult execute_prepared_cql_query(int queryId, ByteBuffer key, 
List<ByteBuffer> values) throws InvalidRequestException, UnavailableException, 
TimedOutException, SchemaDisagreementException, TException
+    {
+        Client client = get(key);
+        try
+        {
+            return 
client.client.execute_prepared_cql_query(client.get(queryId, true), values);
+        } finally
+        {
+            cache.get(client.host).add(client);
+        }
+    }
+
+    @Override
+    public Map<ByteBuffer, List<ColumnOrSuperColumn>> 
multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, 
SlicePredicate predicate, ConsistencyLevel consistency_level) throws 
InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<KeySlice> get_range_slices(ColumnParent column_parent, 
SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) 
throws InvalidRequestException, UnavailableException, TimedOutException, 
TException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<KeySlice> get_indexed_slices(ColumnParent column_parent, 
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel 
consistency_level) throws InvalidRequestException, UnavailableException, 
TimedOutException, TException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java 
b/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java
new file mode 100644
index 0000000..1ceca29
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/ThriftClient.java
@@ -0,0 +1,36 @@
+package org.apache.cassandra.stress.util;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.thrift.TException;
+
+public interface ThriftClient
+{
+
+    public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> 
record, ConsistencyLevel consistencyLevel) throws TException;
+
+    List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent parent, 
SlicePredicate predicate, ConsistencyLevel consistencyLevel) throws 
InvalidRequestException, UnavailableException, TimedOutException, TException;
+
+    void insert(ByteBuffer key, ColumnParent column_parent, Column column, 
ConsistencyLevel consistency_level) throws InvalidRequestException, 
UnavailableException, TimedOutException, TException;
+
+    Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> 
keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel 
consistency_level) throws InvalidRequestException, UnavailableException, 
TimedOutException, TException;
+
+    List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate 
predicate, KeyRange range, ConsistencyLevel consistency_level) throws 
InvalidRequestException, UnavailableException, TimedOutException, TException;
+
+    List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause 
index_clause, SlicePredicate column_predicate, ConsistencyLevel 
consistency_level) throws InvalidRequestException, UnavailableException, 
TimedOutException, TException;
+
+    Integer prepare_cql3_query(String query, Compression compression) throws 
InvalidRequestException, TException;
+
+    CqlResult execute_prepared_cql3_query(int itemId, ByteBuffer key, 
List<ByteBuffer> values, ConsistencyLevel consistency) throws 
InvalidRequestException, UnavailableException, TimedOutException, 
SchemaDisagreementException, TException;
+
+    CqlResult execute_cql_query(String query, ByteBuffer key, Compression 
compression) throws InvalidRequestException, UnavailableException, 
TimedOutException, SchemaDisagreementException, TException;
+
+    CqlResult execute_cql3_query(String query, ByteBuffer key, Compression 
compression, ConsistencyLevel consistency) throws InvalidRequestException, 
UnavailableException, TimedOutException, SchemaDisagreementException, 
TException;
+
+    Integer prepare_cql_query(String query, Compression compression) throws 
InvalidRequestException, TException;
+
+    CqlResult execute_prepared_cql_query(int itemId, ByteBuffer key, 
List<ByteBuffer> values) throws InvalidRequestException, UnavailableException, 
TimedOutException, SchemaDisagreementException, TException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java 
b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
new file mode 100644
index 0000000..c216561
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
@@ -0,0 +1,129 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+// a timer - this timer must be used by a single thread, and co-ordinates with 
other timers by
+public final class Timer
+{
+
+    private static final int SAMPLE_SIZE_SHIFT = 10;
+    private static final int SAMPLE_SIZE_MASK = (1 << SAMPLE_SIZE_SHIFT) - 1;
+
+    private final Random rnd = new Random();
+
+    // in progress snap start
+    private long sampleStartNanos;
+
+    // each entry is present with probability 1/p(opCount) or 1/(p(opCount)-1)
+    private final long[] sample = new long[1 << SAMPLE_SIZE_SHIFT];
+    private int opCount;
+
+    // aggregate info
+    private int keyCount;
+    private long total;
+    private long max;
+    private long maxStart;
+    private long upToDateAsOf;
+    private long lastSnap = System.nanoTime();
+
+    // communication with summary/logging thread
+    private volatile CountDownLatch reportRequest;
+    volatile TimingInterval report;
+    private volatile TimingInterval finalReport;
+
+    public void start(){
+        // decide if we're logging this event
+        sampleStartNanos = System.nanoTime();
+    }
+
+    private static int p(int index)
+    {
+        return 1 + (index >>> SAMPLE_SIZE_SHIFT);
+    }
+
+    public void stop(int keys)
+    {
+        maybeReport();
+        long now = System.nanoTime();
+        long time = now - sampleStartNanos;
+        if (rnd.nextInt(p(opCount)) == 0)
+            sample[index(opCount)] = time;
+        if (time > max)
+        {
+            maxStart = sampleStartNanos;
+            max = time;
+        }
+        total += time;
+        opCount += 1;
+        keyCount += keys;
+        upToDateAsOf = now;
+    }
+
+    private static int index(int count)
+    {
+        return count & SAMPLE_SIZE_MASK;
+    }
+
+    private TimingInterval buildReport()
+    {
+        final List<SampleOfLongs> sampleLatencies = Arrays.asList
+                (       new SampleOfLongs(Arrays.copyOf(sample, 
index(opCount)), p(opCount)),
+                        new SampleOfLongs(Arrays.copyOfRange(sample, 
index(opCount), Math.min(opCount, sample.length)), p(opCount) - 1)
+                );
+        final TimingInterval report = new TimingInterval(lastSnap, 
upToDateAsOf, max, maxStart, max, keyCount, total, opCount,
+                SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE));
+        // reset counters
+        opCount = 0;
+        keyCount = 0;
+        total = 0;
+        max = 0;
+        lastSnap = upToDateAsOf;
+        return report;
+    }
+
+    // checks to see if a report has been requested, and if so produces the 
report, signals and clears the request
+    private void maybeReport()
+    {
+        if (reportRequest != null)
+        {
+            synchronized (this)
+            {
+                report = buildReport();
+                reportRequest.countDown();
+                reportRequest = null;
+            }
+        }
+    }
+
+    // checks to see if the timer is dead; if not requests a report, and 
otherwise fulfills the request itself
+    synchronized void requestReport(CountDownLatch signal)
+    {
+        if (finalReport != null)
+        {
+            report = finalReport;
+            finalReport = new TimingInterval(0);
+            signal.countDown();
+        }
+        else
+            reportRequest = signal;
+    }
+
+    // closes the timer; if a request is outstanding, it furnishes the 
request, otherwise it populates finalReport
+    public synchronized void close()
+    {
+        if (reportRequest == null)
+            finalReport = buildReport();
+        else
+        {
+            finalReport = new TimingInterval(0);
+            report = buildReport();
+            reportRequest.countDown();
+            reportRequest = null;
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java 
b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
new file mode 100644
index 0000000..6f5052f
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -0,0 +1,72 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+// relatively simple timing class for getting a uniform sample of latencies, 
and saving other metrics
+// ensures accuracy of timing by having single threaded timers that are 
check-pointed by the snapping thread,
+// which waits for them to report back. They report back the data up to the 
last event prior to the check-point.
+// if the threads are blocked/paused this may mean a period of time longer 
than the checkpoint elapses, but that all
+// metrics calculated over the interval are accurate
+public class Timing
+{
+
+    private final CopyOnWriteArrayList<Timer> timers = new 
CopyOnWriteArrayList<>();
+    private volatile TimingInterval history;
+    private final Random rnd = new Random();
+
+    // TIMING
+
+    private TimingInterval snapInterval(Random rnd) throws InterruptedException
+    {
+        final Timer[] timers = this.timers.toArray(new Timer[0]);
+        final CountDownLatch ready = new CountDownLatch(timers.length);
+        for (int i = 0 ; i < timers.length ; i++)
+        {
+            final Timer timer = timers[i];
+            timer.requestReport(ready);
+        }
+
+        // TODO fail gracefully after timeout if a thread is stuck
+        if (!ready.await(2L, TimeUnit.MINUTES))
+            throw new RuntimeException("Timed out waiting for a timer thread - 
seems one got stuck");
+
+        // reports have been filled in by timer threadCount, so merge
+        List<TimingInterval> intervals = new ArrayList<>();
+        for (Timer timer : timers)
+            intervals.add(timer.report);
+
+        return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, 
history.endNanos());
+    }
+
+    // build a new timer and add it to the set of running timers
+    public Timer newTimer()
+    {
+        final Timer timer = new Timer();
+        timers.add(timer);
+        return timer;
+    }
+
+    public void start()
+    {
+        history = new TimingInterval(System.nanoTime());
+    }
+
+    public TimingInterval snapInterval() throws InterruptedException
+    {
+        final TimingInterval interval = snapInterval(rnd);
+        history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 
50000, history.startNanos());
+        return interval;
+    }
+
+    public TimingInterval getHistory()
+    {
+        return history;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java 
b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
new file mode 100644
index 0000000..04fb044
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
@@ -0,0 +1,132 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+// represents measurements taken over an interval of time
+// used for both single timer results and merged timer results
+public final class TimingInterval
+{
+    // nanos
+    private final long start;
+    private final long end;
+    public final long maxLatency;
+    public final long pauseLength;
+    public final long pauseStart;
+    public final long totalLatency;
+
+    // discrete
+    public final long keyCount;
+    public final long operationCount;
+
+    final SampleOfLongs sample;
+
+    TimingInterval(long time)
+    {
+        start = end = time;
+        maxLatency = totalLatency = 0;
+        keyCount = operationCount = 0;
+        pauseStart = pauseLength = 0;
+        sample = new SampleOfLongs(new long[0], 1d);
+    }
+    TimingInterval(long start, long end, long maxLatency, long pauseStart, 
long pauseLength, long keyCount, long totalLatency, long operationCount, 
SampleOfLongs sample)
+    {
+        this.start = start;
+        this.end = Math.max(end, start);
+        this.maxLatency = maxLatency;
+        this.keyCount = keyCount;
+        this.totalLatency = totalLatency;
+        this.operationCount = operationCount;
+        this.pauseStart = pauseStart;
+        this.pauseLength = pauseLength;
+        this.sample = sample;
+    }
+
+    // merge multiple timer intervals together
+    static TimingInterval merge(Random rnd, List<TimingInterval> intervals, 
int maxSamples, long start)
+    {
+        int operationCount = 0, keyCount = 0;
+        long maxLatency = 0, totalLatency = 0;
+        List<SampleOfLongs> latencies = new ArrayList<>();
+        long end = 0;
+        long pauseStart = 0, pauseEnd = Long.MAX_VALUE;
+        for (TimingInterval interval : intervals)
+        {
+            end = Math.max(end, interval.end);
+            operationCount += interval.operationCount;
+            maxLatency = Math.max(interval.maxLatency, maxLatency);
+            totalLatency += interval.totalLatency;
+            keyCount += interval.keyCount;
+            latencies.addAll(Arrays.asList(interval.sample));
+            if (interval.pauseLength > 0)
+            {
+                pauseStart = Math.max(pauseStart, interval.pauseStart);
+                pauseEnd = Math.min(pauseEnd, interval.pauseStart + 
interval.pauseLength);
+            }
+        }
+        if (pauseEnd < pauseStart)
+            pauseEnd = pauseStart = 0;
+        return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd 
- pauseStart, keyCount, totalLatency, operationCount,
+                SampleOfLongs.merge(rnd, latencies, maxSamples));
+
+    }
+
+    public double realOpRate()
+    {
+        return operationCount / ((end - start) * 0.000000001d);
+    }
+
+    public double adjustedOpRate()
+    {
+        return operationCount / ((end - (start + pauseLength)) * 0.000000001d);
+    }
+
+    public double keyRate()
+    {
+        return keyCount / ((end - start) * 0.000000001d);
+    }
+
+    public double meanLatency()
+    {
+        return (totalLatency / (double) operationCount) * 0.000001d;
+    }
+
+    public double maxLatency()
+    {
+        return maxLatency * 0.000001d;
+    }
+
+    public long runTime()
+    {
+        return (end - start) / 1000000;
+    }
+
+    public double medianLatency()
+    {
+        return sample.medianLatency();
+    }
+
+    // 0 < rank < 1
+    public double rankLatency(float rank)
+    {
+        return sample.rankLatency(rank);
+    }
+
+    public final long endNanos()
+    {
+        return end;
+    }
+
+    public final long endMillis()
+    {
+        return end / 1000000;
+    }
+
+    public long startNanos()
+    {
+        return start;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java 
b/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java
new file mode 100644
index 0000000..ac2d803
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Uncertainty.java
@@ -0,0 +1,81 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
+// TODO: do not assume normal distribution of measurements.
+public class Uncertainty
+{
+
+    private int measurements;
+    private double sumsquares;
+    private double sum;
+    private double stdev;
+    private double mean;
+    private double uncertainty;
+
+    private CopyOnWriteArrayList<WaitForTargetUncertainty> waiting = new 
CopyOnWriteArrayList<>();
+
+    private static final class WaitForTargetUncertainty
+    {
+        final double targetUncertainty;
+        final int minMeasurements;
+        final int maxMeasurements;
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        private WaitForTargetUncertainty(double targetUncertainty, int 
minMeasurements, int maxMeasurements)
+        {
+            this.targetUncertainty = targetUncertainty;
+            this.minMeasurements = minMeasurements;
+            this.maxMeasurements = maxMeasurements;
+        }
+
+        void await() throws InterruptedException
+        {
+            latch.await();
+        }
+
+    }
+
+    public void update(double value)
+    {
+        measurements++;
+        sumsquares += value * value;
+        sum += value;
+        mean = sum / measurements;
+        stdev = Math.sqrt((sumsquares / measurements) - (mean * mean));
+        uncertainty = (stdev / Math.sqrt(measurements)) / mean;
+
+        for (WaitForTargetUncertainty waiter : waiting)
+        {
+            if ((uncertainty < waiter.targetUncertainty && measurements >= 
waiter.minMeasurements) || (measurements >= waiter.maxMeasurements))
+            {
+                waiter.latch.countDown();
+                // can safely remove as working over snapshot with COWArrayList
+                waiting.remove(waiter);
+            }
+        }
+    }
+
+    public void await(double targetUncertainty, int minMeasurements, int 
maxMeasurements) throws InterruptedException
+    {
+        final WaitForTargetUncertainty wait = new 
WaitForTargetUncertainty(targetUncertainty, minMeasurements, maxMeasurements);
+        waiting.add(wait);
+        wait.await();
+    }
+
+    public double getUncertainty()
+    {
+        return uncertainty;
+    }
+
+    public void wakeAll()
+    {
+        for (WaitForTargetUncertainty waiting : this.waiting)
+        {
+            waiting.latch.countDown();
+            this.waiting.remove(waiting);
+        }
+    }
+
+}

Reply via email to