Updated Branches: refs/heads/trunk 6ca75ef9b -> 0b94b191d
add PBSPredictor consistency modeler patch by Peter Bailis and Shivaram Venkataraman; reviewed by jbellis for CASSANDRA-4261 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b94b191 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b94b191 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b94b191 Branch: refs/heads/trunk Commit: 0b94b191d803f2a59e39c0e14fca45f5fb2ceb65 Parents: 6ca75ef Author: Jonathan Ellis <[email protected]> Authored: Thu Sep 27 15:28:25 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Sep 27 15:28:25 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 5 + .../org/apache/cassandra/net/MessagingService.java | 34 +- .../cassandra/service/PBSPredictionResult.java | 127 +++ .../org/apache/cassandra/service/PBSPredictor.java | 636 +++++++++++++++ .../cassandra/service/PBSPredictorMBean.java | 35 + .../apache/cassandra/service/StorageService.java | 2 + src/java/org/apache/cassandra/tools/NodeCmd.java | 65 ++- src/java/org/apache/cassandra/tools/NodeProbe.java | 8 + .../org/apache/cassandra/tools/NodeToolHelp.yaml | 4 +- .../apache/cassandra/service/PBSPredictorTest.java | 114 +++ 11 files changed, 1023 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6d044af..fa8b8cb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-beta2 + * add PBSPredictor consistency modeler (CASSANDRA-4261) * remove vestiges of Thrift unframed mode (CASSANDRA-4729) * optimize single-row PK lookups (CASSANDRA-4710) * adjust blockFor calculation to account for pending ranges due to node http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 1477a4a..7c288cc 100644 --- a/build.xml +++ b/build.xml @@ -1134,6 +1134,11 @@ </testmacro> </target> + <target name="pbs-test" depends="build-test" description="Tests PBS predictor"> + <testmacro suitename="unit" inputdir="${test.unit.src}" + timeout="15000" filter="**/PBSPredictorTest.java"/> + </target> + <target name="long-test" depends="build-test" description="Execute functional tests"> <testmacro suitename="long" inputdir="${test.long.src}" timeout="${test.long.timeout}"> http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index a2c5939..63d6e38 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -36,18 +36,17 @@ import javax.management.ObjectName; import com.google.common.base.Function; import com.google.common.collect.Lists; -import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.db.*; import org.apache.cassandra.dht.BootStrapper; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.GossipDigestAck; import org.apache.cassandra.gms.GossipDigestAck2; import org.apache.cassandra.gms.GossipDigestSyn; @@ -58,14 +57,12 @@ import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.security.SSLFactory; -import org.apache.cassandra.service.AntiEntropyService; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.*; import org.apache.cassandra.streaming.*; import org.apache.cassandra.streaming.compress.CompressedFileStreamTask; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; +import org.cliffc.high_scale_lib.NonBlockingHashMap; public final class MessagingService implements MessagingServiceMBean { @@ -555,6 +552,16 @@ public final class MessagingService implements MessagingServiceMBean public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout) { String id = addCallback(cb, message, to, timeout); + + if (cb instanceof AbstractWriteResponseHandler) + { + PBSPredictor.instance().startWriteOperation(id); + } + else if (cb instanceof ReadCallback) + { + PBSPredictor.instance().startReadOperation(id); + } + sendOneWay(message, id, to); return id; } @@ -695,6 +702,21 @@ public final class MessagingService implements MessagingServiceMBean Runnable runnable = new MessageDeliveryTask(message, id, timestamp); ExecutorService stage = StageManager.getStage(message.getMessageType()); assert stage != null : "No stage for message type " + message.verb; + + if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled()) + { + IMessageCallback cb = MessagingService.instance().getRegisteredCallback(id).callback; + + if (cb instanceof AbstractWriteResponseHandler) + { + PBSPredictor.instance().logWriteResponse(id, timestamp); + } + else if (cb instanceof ReadCallback) + { + PBSPredictor.instance().logReadResponse(id, timestamp); + } + } + stage.execute(runnable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/PBSPredictionResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PBSPredictionResult.java b/src/java/org/apache/cassandra/service/PBSPredictionResult.java new file mode 100644 index 0000000..92c5491 --- /dev/null +++ b/src/java/org/apache/cassandra/service/PBSPredictionResult.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.Serializable; + +public class PBSPredictionResult implements Serializable +{ + private int n; + private int r; + private int w; + + private float timeSinceWrite; + private int numberVersionsStale; + + private float consistencyProbability; + + private float averageReadLatency; + private float averageWriteLatency; + private long percentileReadLatencyValue; + private float percentileReadLatencyPercentile; + private long percentileWriteLatencyValue; + private float percentileWriteLatencyPercentile; + + public PBSPredictionResult(int n, + int r, + int w, + float timeSinceWrite, + int numberVersionsStale, + float consistencyProbability, + float averageReadLatency, + float averageWriteLatency, + long percentileReadLatencyValue, + float percentileReadLatencyPercentile, + long percentileWriteLatencyValue, + float percentileWriteLatencyPercentile) { + this.n = n; + this.r = r; + this.w = w; + this.timeSinceWrite = timeSinceWrite; + this.numberVersionsStale = numberVersionsStale; + this.consistencyProbability = consistencyProbability; + this.averageReadLatency = averageReadLatency; + this.averageWriteLatency = averageWriteLatency; + this.percentileReadLatencyValue = percentileReadLatencyValue; + this.percentileReadLatencyPercentile = percentileReadLatencyPercentile; + this.percentileWriteLatencyValue = percentileWriteLatencyValue; + this.percentileWriteLatencyPercentile = percentileWriteLatencyPercentile; + } + + public int getN() + { + return n; + } + + public int getR() + { + return r; + } + + public int getW() + { + return w; + } + + public float getTimeSinceWrite() + { + return timeSinceWrite; + } + + public int getNumberVersionsStale() + { + return numberVersionsStale; + } + + public float getConsistencyProbability() + { + return consistencyProbability; + } + + public float getAverageReadLatency() + { + return averageReadLatency; + } + + public float getAverageWriteLatency() + { + return averageWriteLatency; + } + + public long getPercentileReadLatencyValue() + { + return percentileReadLatencyValue; + } + + public float getPercentileReadLatencyPercentile() + { + return percentileReadLatencyPercentile; + } + + public long getPercentileWriteLatencyValue() + { + return percentileWriteLatencyValue; + } + + public float getPercentileWriteLatencyPercentile() + { + return percentileWriteLatencyPercentile; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/PBSPredictor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PBSPredictor.java b/src/java/org/apache/cassandra/service/PBSPredictor.java new file mode 100644 index 0000000..4bd6381 --- /dev/null +++ b/src/java/org/apache/cassandra/service/PBSPredictor.java @@ -0,0 +1,636 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.apache.cassandra.net.MessageIn; + +import java.lang.management.ManagementFactory; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.google.common.primitives.Longs; + +import org.apache.cassandra.thrift.InvalidRequestException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performs latency and consistency predictions as described in + * <a href="http://arxiv.org/pdf/1204.6082.pdf"> + * "Probabilistically Bounded Staleness for Practical Partial Quorums"</a> + * by Bailis et al. in VLDB 2012. The predictions are of the form: + * <p/> + * <i>With ReplicationFactor <tt>N</tt>, read consistency level of + * <tt>R</tt>, and write consistency level <tt>W</tt>, after + * <tt>t</tt> seconds, <tt>p</tt>% of reads will return a version + * within <tt>k</tt> versions of the last written; this should result + * in a latency of <tt>L</tt> ms.</i> + * <p/> + * <p/> + * These predictions should be used as a rough guideline for system + * operators. This interface is exposed through nodetool. + * <p/> + * <p/> + * The class accomplishes this by measuring latencies for reads and + * writes, then using Monte Carlo simulation to predict behavior under + * a given N,R, and W based on those latencies. + * <p/> + * <p/> + * We capture four distributions: + * <p/> + * <ul> + * <li> + * <tt>W</tt>: time from when the coordinator sends a mutation to the time + * that a replica begins to serve the new value(s) + * </li> + * <p/> + * <li> + * <tt>A</tt>: time from when a replica accepting a mutation sends an + * acknowledgment to the time the coordinator hears of it + * </li> + * <p/> + * <li> + * <tt>R</tt>: time from when the coordinator sends a read request to the time + * that the replica performs the read + * </li> + * <p/> + * <li> + * <tt>S</tt>: time from when the replica sends a read response to the time + * when the coordinator receives it + * </li> + * </ul> + * <p/> + * <tt>A</tt> and <tt>S</tt> are mostly network-bound, while W and R + * depend on both the network and local processing time. + * <p/> + * <p/> + * <b>Caveats:</b> + * Prediction is only as good as the latencies collected. Accurate + * prediction requires synchronizing clocks between replicas. We + * collect a running sample of latencies, but, if latencies change + * dramatically, predictions will be off. + * <p/> + * <p/> + * The predictions are conservative, or worst-case, meaning we may + * predict more staleness than in practice in the following ways: + * <ul> + * <li> + * We do not account for read repair. + * </li> + * <li> + * We do not account for Merkle tree exchange. + * </li> + * <li> + * Multi-version staleness is particularly conservative. + * </li> + * <li> + * We simulate non-local reads and writes. We assume that the + * coordinating Cassandra node is not itself a replica for a given key. + * </li> + * </ul> + * <p/> + * <p/> + * The predictions are optimistic in the following ways: + * <ul> + * <li> + * We do not predict the impact of node failure. + * </li> + * <li> + * We do not model hinted handoff. + * </li> + * </ul> + * + * @see org.apache.cassandra.thrift.ConsistencyLevel + * @see org.apache.cassandra.locator.AbstractReplicationStrategy + */ + +public class PBSPredictor implements PBSPredictorMBean +{ + private static final Logger logger = LoggerFactory.getLogger(PBSPredictor.class); + + public static final String MBEAN_NAME = "org.apache.cassandra.service:type=PBSPredictor"; + private static final boolean DEFAULT_DO_LOG_LATENCIES = false; + private static final int DEFAULT_MAX_LOGGED_LATENCIES = 10000; + private static final int DEFAULT_NUMBER_TRIALS_PREDICTION = 10000; + + /* + * We record a fixed size set of WARS latencies for read and + * mutation operations. We store the order in which each + * operation arrived, and use an LRU policy to evict old + * messages. + * + * This information is stored as a mapping from messageIDs to + * latencies. + */ + + /* + * Helper class which minimizes the number of HashMaps we maintain. + * For a given messageId, this class maintains the startTime of the message, + * and a queue for send times and reply times. + * + * sendLats corresponds to W and R, while replyLats is used for A and S. + */ + private class MessageLatencyCollection + { + MessageLatencyCollection(Long startTime) + { + this.startTime = startTime; + this.sendLats = new ConcurrentLinkedQueue<Long>(); + this.replyLats = new ConcurrentLinkedQueue<Long>(); + } + + void addSendLat(Long sendLat) + { + sendLats.add(sendLat); + } + + void addReplyLat(Long replyLat) + { + replyLats.add(replyLat); + } + + Collection<Long> getSendLats() + { + return sendLats; + } + + Collection<Long> getReplyLats() + { + return replyLats; + } + + Long getStartTime() + { + return startTime; + } + + Long startTime; + Collection<Long> sendLats; + Collection<Long> replyLats; + } + + // used for LRU replacement + private final Queue<String> writeMessageIds = new LinkedBlockingQueue<String>(); + private final Queue<String> readMessageIds = new LinkedBlockingQueue<String>(); + + private final Map<String, MessageLatencyCollection> messageIdToWriteLats = new ConcurrentHashMap<String, MessageLatencyCollection>(); + private final Map<String, MessageLatencyCollection> messageIdToReadLats = new ConcurrentHashMap<String, MessageLatencyCollection>(); + + private Random random; + private boolean initialized = false; + + private boolean logLatencies = DEFAULT_DO_LOG_LATENCIES; + private int maxLoggedLatencies = DEFAULT_MAX_LOGGED_LATENCIES; + private int numberTrialsPrediction = DEFAULT_NUMBER_TRIALS_PREDICTION; + + private static final PBSPredictor instance = new PBSPredictor(); + + public static PBSPredictor instance() + { + return instance; + } + + private PBSPredictor() + { + init(); + } + + public void enableConsistencyPredictionLogging() + { + logLatencies = true; + } + + public void disableConsistencyPredictionLogging() + { + logLatencies = false; + } + + public boolean isLoggingEnabled() + { + return logLatencies; + } + + public void setMaxLoggedLatenciesForConsistencyPrediction(int maxLogged) + { + maxLoggedLatencies = maxLogged; + } + + public void setNumberTrialsForConsistencyPrediction(int numTrials) + { + numberTrialsPrediction = numTrials; + } + + public void init() + { + if (!initialized) + { + random = new Random(); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName(PBSPredictor.MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + initialized = true; + } + } + + + // used for random sampling from the latencies + private long getRandomElement(List<Long> list) + { + if (list.size() == 0) + throw new RuntimeException("Not enough data for prediction"); + return list.get(random.nextInt(list.size())); + } + + // used for calculating the average latency of a read or write operation + // given a set of simulated latencies + private float listAverage(List<Long> list) + { + long accum = 0; + for (long value : list) + accum += value; + return (float) accum / list.size(); + } + + // calculate the percentile entry of a list + private long getPercentile(List<Long> list, float percentile) + { + Collections.sort(list); + return list.get((int) (list.size() * percentile)); + } + + /* + * For our trials, sample the latency for the (replicaNumber)th + * reply for one of WARS + * if replicaNumber > the number of replicas we have data for + * (say we have data for ReplicationFactor 2 but ask for N=3) + * then we randomly sample from all response times + */ + private long getRandomLatencySample(Map<Integer, List<Long>> samples, int replicaNumber) + { + if (samples.containsKey(replicaNumber)) + { + return getRandomElement(samples.get(replicaNumber)); + } + + return getRandomElement(samples.get(samples.keySet().toArray()[random.nextInt(samples.keySet().size())])); + } + + /* + * To perform the prediction, we randomly sample from the + * collected WARS latencies, simulating writes followed by reads + * exactly t milliseconds afterwards. We count the number of + * reordered reads and writes to calculate the probability of + * staleness along with recording operation latencies. + */ + + + public PBSPredictionResult doPrediction(int n, + int r, + int w, + float timeSinceWrite, + int numberVersionsStale, + float percentileLatency) throws Exception + { + if (r > n) + throw new IllegalArgumentException("r must be less than n"); + if (r < 0) + throw new IllegalArgumentException("r must be positive"); + if (w > n) + throw new IllegalArgumentException("w must be less than n"); + if (w < 0) + throw new IllegalArgumentException("w must be positive"); + if (percentileLatency < 0 || percentileLatency > 1) + throw new IllegalArgumentException("percentileLatency must be between 0 and 1 inclusive"); + if (numberVersionsStale < 0) + throw new IllegalArgumentException("numberVersionsStale must be positive"); + + if (!logLatencies) + throw new InvalidRequestException("Latency logging is not enabled"); + + // get a mapping of {replica number : latency} for each of WARS + Map<Integer, List<Long>> wLatencies = getOrderedWLatencies(); + Map<Integer, List<Long>> aLatencies = getOrderedALatencies(); + Map<Integer, List<Long>> rLatencies = getOrderedRLatencies(); + Map<Integer, List<Long>> sLatencies = getOrderedSLatencies(); + + if (wLatencies.isEmpty() || aLatencies.isEmpty()) + throw new InvalidRequestException("No write latencies have been recorded so far. Run some (non-local) inserts."); + + if (rLatencies.isEmpty() || sLatencies.isEmpty()) + throw new InvalidRequestException("No read latencies have been recorded so far. Run some (non-local) reads."); + + // storage for simulated read and write latencies + ArrayList<Long> readLatencies = new ArrayList<Long>(); + ArrayList<Long> writeLatencies = new ArrayList<Long>(); + + long consistentReads = 0; + + // storage for latencies for each replica for a given Monte Carlo trial + // arr[i] will hold the ith replica's latency for one of WARS + ArrayList<Long> trialWLatencies = new ArrayList<Long>(); + ArrayList<Long> trialRLatencies = new ArrayList<Long>(); + + ArrayList<Long> replicaWriteLatencies = new ArrayList<Long>(); + ArrayList<Long> replicaReadLatencies = new ArrayList<Long>(); + + //run repeated trials and observe staleness + for (int i = 0; i < numberTrialsPrediction; ++i) + { + //simulate sending a write to N replicas then sending a + //read to N replicas and record the latencies by randomly + //sampling from gathered latencies + for (int replicaNo = 0; replicaNo < n; ++replicaNo) + { + long trialWLatency = getRandomLatencySample(wLatencies, replicaNo); + long trialALatency = getRandomLatencySample(aLatencies, replicaNo); + + trialWLatencies.add(trialWLatency); + + replicaWriteLatencies.add(trialWLatency + trialALatency); + } + + // reads are only sent to R replicas - so pick R random read and + // response latencies + for (int replicaNo = 0; replicaNo < r; ++replicaNo) + { + long trialRLatency = getRandomLatencySample(rLatencies, replicaNo); + long trialSLatency = getRandomLatencySample(sLatencies, replicaNo); + + trialRLatencies.add(trialRLatency); + + replicaReadLatencies.add(trialRLatency + trialSLatency); + } + + // the write latency for this trial is the time it takes + // for the wth replica to respond (W+A) + Collections.sort(replicaWriteLatencies); + long writeLatency = replicaWriteLatencies.get(w - 1); + writeLatencies.add(writeLatency); + + ArrayList<Long> sortedReplicaReadLatencies = new ArrayList<Long>(replicaReadLatencies); + Collections.sort(sortedReplicaReadLatencies); + + // the read latency for this trial is the time it takes + // for the rth replica to respond (R+S) + readLatencies.add(sortedReplicaReadLatencies.get(r - 1)); + + // were all of the read responses reordered? + + // for each of the first r messages (the ones the + // coordinator will pick from): + //--if the read message came in after this replica saw the + // write, it will be consistent + //--each read request is sent at time + // writeLatency+timeSinceWrite + + for (int responseNumber = 0; responseNumber < r; ++responseNumber) + { + int replicaNumber = replicaReadLatencies.indexOf(sortedReplicaReadLatencies.get(responseNumber)); + + if (writeLatency + timeSinceWrite + trialRLatencies.get(replicaNumber) >= + trialWLatencies.get(replicaNumber)) + { + consistentReads++; + break; + } + + // tombstone this replica in the case that we have + // duplicate read latencies + replicaReadLatencies.set(replicaNumber, -1L); + } + + // clear storage for the next trial + trialWLatencies.clear(); + trialRLatencies.clear(); + + replicaReadLatencies.clear(); + replicaWriteLatencies.clear(); + } + + float oneVersionConsistencyProbability = (float) consistentReads / numberTrialsPrediction; + + // to calculate multi-version staleness, we exponentiate the staleness probability by the number of versions + float consistencyProbability = (float) (1 - Math.pow((double) (1 - oneVersionConsistencyProbability), + numberVersionsStale)); + + float averageWriteLatency = listAverage(writeLatencies); + float averageReadLatency = listAverage(readLatencies); + + long percentileWriteLatency = getPercentile(writeLatencies, percentileLatency); + long percentileReadLatency = getPercentile(readLatencies, percentileLatency); + + return new PBSPredictionResult(n, + r, + w, + timeSinceWrite, + numberVersionsStale, + consistencyProbability, + averageReadLatency, + averageWriteLatency, + percentileReadLatency, + percentileLatency, + percentileWriteLatency, + percentileLatency); + } + + public void startWriteOperation(String id) + { + if (!logLatencies) + return; + + startWriteOperation(id, System.currentTimeMillis()); + } + + public void startWriteOperation(String id, long startTime) + { + if (!logLatencies) + return; + + assert (!messageIdToWriteLats.containsKey(id)); + + writeMessageIds.add(id); + + // LRU replacement of latencies + // the maximum number of entries is sloppy, but that's acceptable for our purposes + if (writeMessageIds.size() > maxLoggedLatencies) + { + String toEvict = writeMessageIds.remove(); + messageIdToWriteLats.remove(toEvict); + } + + messageIdToWriteLats.put(id, new MessageLatencyCollection(startTime)); + } + + public void startReadOperation(String id) + { + if (!logLatencies) + return; + + startReadOperation(id, System.currentTimeMillis()); + } + + public void startReadOperation(String id, long startTime) + { + if (!logLatencies) + return; + + assert (!messageIdToReadLats.containsKey(id)); + readMessageIds.add(id); + + // LRU replacement of latencies + // the maximum number of entries is sloppy, but that's acceptable for our purposes + if (readMessageIds.size() > maxLoggedLatencies) + { + String toEvict = readMessageIds.remove(); + messageIdToReadLats.remove(toEvict); + } + + messageIdToReadLats.put(id, new MessageLatencyCollection(startTime)); + } + + public void logWriteResponse(String id, long constructionTime) + { + if (!logLatencies) + return; + + logWriteResponse(id, constructionTime, System.currentTimeMillis()); + } + + public void logWriteResponse(String id, long responseCreationTime, long receivedTime) + { + if (!logLatencies) + return; + + MessageLatencyCollection writeLatsCollection = messageIdToWriteLats.get(id); + if (writeLatsCollection == null) + { + return; + } + + Long startTime = writeLatsCollection.getStartTime(); + writeLatsCollection.addSendLat(Math.max(0, responseCreationTime - startTime)); + writeLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime)); + } + + public void logReadResponse(String id, long constructionTime) + { + if (!logLatencies) + return; + + logReadResponse(id, constructionTime, System.currentTimeMillis()); + } + + public void logReadResponse(String id, long responseCreationTime, long receivedTime) + { + if (!logLatencies) + return; + + MessageLatencyCollection readLatsCollection = messageIdToReadLats.get(id); + if (readLatsCollection == null) + { + return; + } + + Long startTime = readLatsCollection.getStartTime(); + readLatsCollection.addSendLat(Math.max(0, responseCreationTime - startTime)); + readLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime)); + } + + Map<Integer, List<Long>> getOrderedWLatencies() + { + Collection<Collection<Long>> allWLatencies = new ArrayList<Collection<Long>>(); + for (MessageLatencyCollection wlc : messageIdToWriteLats.values()) + { + allWLatencies.add(wlc.getSendLats()); + } + + return getOrderedLatencies(allWLatencies); + } + + Map<Integer, List<Long>> getOrderedALatencies() + { + Collection<Collection<Long>> allALatencies = new ArrayList<Collection<Long>>(); + for (MessageLatencyCollection wlc : messageIdToWriteLats.values()) + allALatencies.add(wlc.getReplyLats()); + return getOrderedLatencies(allALatencies); + } + + Map<Integer, List<Long>> getOrderedRLatencies() + { + Collection<Collection<Long>> allRLatencies = new ArrayList<Collection<Long>>(); + for (MessageLatencyCollection rlc : messageIdToReadLats.values()) + { + allRLatencies.add(rlc.getSendLats()); + } + return getOrderedLatencies(allRLatencies); + } + + Map<Integer, List<Long>> getOrderedSLatencies() + { + Collection<Collection<Long>> allSLatencies = new ArrayList<Collection<Long>>(); + for (MessageLatencyCollection rlc : messageIdToReadLats.values()) + allSLatencies.add(rlc.getReplyLats()); + return getOrderedLatencies(allSLatencies); + } + + // Return the collected latencies indexed by response number instead of by messageID + private Map<Integer, List<Long>> getOrderedLatencies(Collection<Collection<Long>> latencyLists) + { + Map<Integer, List<Long>> ret = new HashMap<Integer, List<Long>>(); + + // N may vary + int maxResponses = 0; + + for (Collection<Long> latencies : latencyLists) + { + List<Long> sortedLatencies = new ArrayList<Long>(latencies); + Collections.sort(sortedLatencies); + + if (sortedLatencies.size() > maxResponses) + { + for (int i = maxResponses + 1; i <= sortedLatencies.size(); ++i) + { + ret.put(i, new Vector<Long>()); + } + + maxResponses = sortedLatencies.size(); + } + + // indexing by 0 is awkward since we're talking about the ith response + for (int i = 1; i <= sortedLatencies.size(); ++i) + { + ret.get(i).add(sortedLatencies.get(i - 1)); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/PBSPredictorMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PBSPredictorMBean.java b/src/java/org/apache/cassandra/service/PBSPredictorMBean.java new file mode 100644 index 0000000..8c7773d --- /dev/null +++ b/src/java/org/apache/cassandra/service/PBSPredictorMBean.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +public interface PBSPredictorMBean +{ + public PBSPredictionResult doPrediction(int n, + int r, + int w, + float timeSinceWrite, + int numberVersionsStale, + float percentileLatency) throws Exception; + + public void enableConsistencyPredictionLogging(); + public void disableConsistencyPredictionLogging(); + + public void setMaxLoggedLatenciesForConsistencyPrediction(int maxLogged); + public void setNumberTrialsForConsistencyPrediction(int numTrials); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e800d88..723a838 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -405,6 +405,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe throw new AssertionError(e); } + PBSPredictor.instance().init(); + if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) { logger.info("Loading persisted ring state"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 704be4d..ae65832 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -33,6 +33,12 @@ import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Maps; import org.apache.commons.cli.*; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.CacheServiceMBean; +import org.apache.cassandra.service.PBSPredictionResult; +import org.apache.cassandra.service.PBSPredictorMBean; +import org.apache.cassandra.service.StorageProxyMBean; + import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.Table; @@ -137,7 +143,8 @@ public class NodeCmd DESCRIBERING, RANGEKEYSAMPLE, REBUILD_INDEX, - RESETLOCALSCHEMA + RESETLOCALSCHEMA, + PREDICTCONSISTENCY } @@ -866,6 +873,48 @@ public class NodeCmd outs.println(probe.isThriftServerRunning() ? "running" : "not running"); } + public void predictConsistency(Integer replicationFactor, + Integer timeAfterWrite, + Integer numVersions, + Float percentileLatency, + PrintStream output) + { + PBSPredictorMBean predictorMBean = probe.getPBSPredictorMBean(); + + for(int r = 1; r <= replicationFactor; ++r) { + for(int w = 1; w <= replicationFactor; ++w) { + if(w+r > replicationFactor+1) + continue; + + try { + PBSPredictionResult result = predictorMBean.doPrediction(replicationFactor, + r, + w, + timeAfterWrite, + numVersions, + percentileLatency); + + if(r == 1 && w == 1) { + output.printf("%dms after a given write, with maximum version staleness of k=%d\n", timeAfterWrite, numVersions); + } + + output.printf("N=%d, R=%d, W=%d\n", replicationFactor, r, w); + output.printf("Probability of consistent reads: %f\n", result.getConsistencyProbability()); + output.printf("Average read latency: %fms (%.3fth %%ile %dms)\n", result.getAverageReadLatency(), + result.getPercentileReadLatencyPercentile()*100, + result.getPercentileReadLatencyValue()); + output.printf("Average write latency: %fms (%.3fth %%ile %dms)\n\n", result.getAverageWriteLatency(), + result.getPercentileWriteLatencyPercentile()*100, + result.getPercentileWriteLatencyValue()); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + return; + } + } + } + } + public static void main(String[] args) throws IOException, InterruptedException, ConfigurationException, ParseException { CommandLineParser parser = new PosixParser(); @@ -1134,6 +1183,20 @@ public class NodeCmd nodeCmd.printRangeKeySample(System.out); break; + case PREDICTCONSISTENCY: + if (arguments.length < 2) { badUse("Requires replication factor and time"); } + int numVersions = 1; + if (arguments.length == 3) { numVersions = Integer.parseInt(arguments[2]); } + float percentileLatency = .999f; + if (arguments.length == 4) { percentileLatency = Float.parseFloat(arguments[3]); } + + nodeCmd.predictConsistency(Integer.parseInt(arguments[0]), + Integer.parseInt(arguments[1]), + numVersions, + percentileLatency, + System.out); + break; + default : throw new RuntimeException("Unreachable code."); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 814e6d1..d5968c1 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -73,6 +73,7 @@ public class NodeProbe public MessagingServiceMBean msProxy; private FailureDetectorMBean fdProxy; private CacheServiceMBean cacheService; + private PBSPredictorMBean PBSPredictorProxy; private StorageProxyMBean spProxy; /** @@ -142,6 +143,8 @@ public class NodeProbe { ObjectName name = new ObjectName(ssObjName); ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); + name = new ObjectName(PBSPredictor.MBEAN_NAME); + PBSPredictorProxy = JMX.newMBeanProxy(mbeanServerConn, name, PBSPredictorMBean.class); name = new ObjectName(MessagingService.MBEAN_NAME); msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class); name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME); @@ -714,6 +717,11 @@ public class NodeProbe return ssProxy.describeRingJMX(keyspaceName); } + public PBSPredictorMBean getPBSPredictorMBean() + { + return PBSPredictorProxy; + } + public void rebuild(String sourceDc) { ssProxy.rebuild(sourceDc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml index f2e6f9d..bbea055 100644 --- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml +++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml @@ -149,4 +149,6 @@ commands: - name: getsstables <keyspace> <cf> <key> help: | Print the sstable filenames that own the key - + - name: predictconsistency <replication_factor> <time> [versions] [latency_percentile] + help: | + Predict latency and consistency "t" ms after writes http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/test/unit/org/apache/cassandra/service/PBSPredictorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java new file mode 100644 index 0000000..92e863d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.service; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class PBSPredictorTest +{ + private static PBSPredictor predictor = PBSPredictor.instance(); + + private void createWriteResponse(long W, long A, String id) + { + predictor.startWriteOperation(id, 0); + predictor.logWriteResponse(id, W, W+A); + } + + private void createReadResponse(long R, long S, String id) + { + predictor.startReadOperation(id, 0); + predictor.logReadResponse(id, R, R+S); + } + + @Test + public void testDoPrediction() + { + try { + predictor.enableConsistencyPredictionLogging(); + predictor.init(); + + /* + Ensure accuracy given a set of basic latencies + Predictions here match a prior Python implementation + */ + + for (int i = 0; i < 10; ++i) + { + createWriteResponse(10, 0, String.format("W%d", i)); + createReadResponse(0, 0, String.format("R%d", i)); + } + + for (int i = 0; i < 10; ++i) + { + createWriteResponse(0, 0, String.format("WS%d", i)); + } + + // 10ms after write + PBSPredictionResult result = predictor.doPrediction(2,1,1,10.0f,1, 0.99f); + + assertEquals(1, result.getConsistencyProbability(), 0); + assertEquals(2.5, result.getAverageWriteLatency(), .5); + + // 0ms after write + result = predictor.doPrediction(2,1,1,0f,1, 0.99f); + + assertEquals(.75, result.getConsistencyProbability(), 0.05); + + // k=5 versions staleness + result = predictor.doPrediction(2,1,1,5.0f,5, 0.99f); + assertEquals(.98, result.getConsistencyProbability(), 0.05); + assertEquals(2.5, result.getAverageWriteLatency(), .5); + + for (int i = 0; i < 10; ++i) + { + createWriteResponse(20, 0, String.format("WL%d", i)); + } + + // 5ms after write + result = predictor.doPrediction(2,1,1,5.0f,1, 0.99f); + + assertEquals(.67, result.getConsistencyProbability(), .05); + + // N = 5 + result = predictor.doPrediction(5,1,1,5.0f,1, 0.99f); + + assertEquals(.42, result.getConsistencyProbability(), .05); + assertEquals(1.33, result.getAverageWriteLatency(), .5); + + for (int i = 0; i < 10; ++i) + { + createWriteResponse(100, 100, String.format("WVL%d", i)); + createReadResponse(100, 100, String.format("RL%d", i)); + } + + result = predictor.doPrediction(2,1,1,0f,1, 0.99f); + + assertEquals(.860, result.getConsistencyProbability(), .05); + assertEquals(26.5, result.getAverageWriteLatency(), 1); + assertEquals(100.33, result.getAverageReadLatency(), 4); + + result = predictor.doPrediction(2,2,1,0f,1, 0.99f); + + assertEquals(1, result.getConsistencyProbability(), 0); + } catch (Exception e) { + fail(e.getMessage()); + } + } +}
