Updated Branches: refs/heads/trunk eba27a641 -> 0c7141ee8
remove PBSPredictor patch by jbellis; reviewed by marcuse for CASSANDRA-5455 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c7141ee Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c7141ee Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c7141ee Branch: refs/heads/trunk Commit: 0c7141ee8afa05d46680ec22635a5bf41f827b75 Parents: eba27a6 Author: Jonathan Ellis <[email protected]> Authored: Thu Apr 11 11:29:33 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Apr 11 11:29:41 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 5 - .../org/apache/cassandra/net/MessagingService.java | 24 - .../cassandra/service/PBSPredictionResult.java | 127 --- .../org/apache/cassandra/service/PBSPredictor.java | 630 --------------- .../cassandra/service/PBSPredictorMBean.java | 35 - .../apache/cassandra/service/StorageService.java | 2 - src/java/org/apache/cassandra/tools/NodeCmd.java | 59 -- src/java/org/apache/cassandra/tools/NodeProbe.java | 8 - .../apache/cassandra/service/PBSPredictorTest.java | 114 --- 10 files changed, 1 insertions(+), 1004 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2306887..48b91da 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0 + * removed PBSPredictor (CASSANDRA-5455) * CAS support (CASSANDRA-5062, ) * Leveled compaction performs size-tiered compactions in L0 (CASSANDRA-5371) * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 86178c3..8889794 100644 --- a/build.xml +++ b/build.xml @@ -1163,11 +1163,6 @@ </testmacro> </target> - <target name="pbs-test" depends="build-test" description="Tests PBS predictor"> - <testmacro suitename="unit" inputdir="${test.unit.src}" - timeout="15000" filter="**/PBSPredictorTest.java"/> - </target> - <target name="long-test" depends="build-test" description="Execute functional tests"> <testmacro suitename="long" inputdir="${test.long.src}" timeout="${test.long.timeout}"> http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index d9688a9..c23f566 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -591,16 +591,6 @@ public final class MessagingService implements MessagingServiceMBean public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout) { int id = addCallback(cb, message, to, timeout); - - if (cb instanceof AbstractWriteResponseHandler) - { - PBSPredictor.instance().startWriteOperation(id); - } - else if (cb instanceof ReadCallback) - { - PBSPredictor.instance().startReadOperation(id); - } - sendOneWay(message, id, to); return id; } @@ -740,20 +730,6 @@ public final class MessagingService implements MessagingServiceMBean ExecutorService stage = StageManager.getStage(message.getMessageType()); assert stage != null : "No stage for message type " + message.verb; - if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled()) - { - IAsyncCallback cb = MessagingService.instance().getRegisteredCallback(id).callback; - - if (cb instanceof AbstractWriteResponseHandler) - { - PBSPredictor.instance().logWriteResponse(id, timestamp); - } - else if (cb instanceof ReadCallback) - { - PBSPredictor.instance().logReadResponse(id, timestamp); - } - } - stage.execute(runnable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/service/PBSPredictionResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PBSPredictionResult.java b/src/java/org/apache/cassandra/service/PBSPredictionResult.java deleted file mode 100644 index 92c5491..0000000 --- a/src/java/org/apache/cassandra/service/PBSPredictionResult.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service; - -import java.io.Serializable; - -public class PBSPredictionResult implements Serializable -{ - private int n; - private int r; - private int w; - - private float timeSinceWrite; - private int numberVersionsStale; - - private float consistencyProbability; - - private float averageReadLatency; - private float averageWriteLatency; - private long percentileReadLatencyValue; - private float percentileReadLatencyPercentile; - private long percentileWriteLatencyValue; - private float percentileWriteLatencyPercentile; - - public PBSPredictionResult(int n, - int r, - int w, - float timeSinceWrite, - int numberVersionsStale, - float consistencyProbability, - float averageReadLatency, - float averageWriteLatency, - long percentileReadLatencyValue, - float percentileReadLatencyPercentile, - long percentileWriteLatencyValue, - float percentileWriteLatencyPercentile) { - this.n = n; - this.r = r; - this.w = w; - this.timeSinceWrite = timeSinceWrite; - this.numberVersionsStale = numberVersionsStale; - this.consistencyProbability = consistencyProbability; - this.averageReadLatency = averageReadLatency; - this.averageWriteLatency = averageWriteLatency; - this.percentileReadLatencyValue = percentileReadLatencyValue; - this.percentileReadLatencyPercentile = percentileReadLatencyPercentile; - this.percentileWriteLatencyValue = percentileWriteLatencyValue; - this.percentileWriteLatencyPercentile = percentileWriteLatencyPercentile; - } - - public int getN() - { - return n; - } - - public int getR() - { - return r; - } - - public int getW() - { - return w; - } - - public float getTimeSinceWrite() - { - return timeSinceWrite; - } - - public int getNumberVersionsStale() - { - return numberVersionsStale; - } - - public float getConsistencyProbability() - { - return consistencyProbability; - } - - public float getAverageReadLatency() - { - return averageReadLatency; - } - - public float getAverageWriteLatency() - { - return averageWriteLatency; - } - - public long getPercentileReadLatencyValue() - { - return percentileReadLatencyValue; - } - - public float getPercentileReadLatencyPercentile() - { - return percentileReadLatencyPercentile; - } - - public long getPercentileWriteLatencyValue() - { - return percentileWriteLatencyValue; - } - - public float getPercentileWriteLatencyPercentile() - { - return percentileWriteLatencyPercentile; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/service/PBSPredictor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PBSPredictor.java b/src/java/org/apache/cassandra/service/PBSPredictor.java deleted file mode 100644 index 85ef304..0000000 --- a/src/java/org/apache/cassandra/service/PBSPredictor.java +++ /dev/null @@ -1,630 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service; - -import java.lang.management.ManagementFactory; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Performs latency and consistency predictions as described in - * <a href="http://arxiv.org/pdf/1204.6082.pdf"> - * "Probabilistically Bounded Staleness for Practical Partial Quorums"</a> - * by Bailis et al. in VLDB 2012. The predictions are of the form: - * <p/> - * <i>With ReplicationFactor <tt>N</tt>, read consistency level of - * <tt>R</tt>, and write consistency level <tt>W</tt>, after - * <tt>t</tt> seconds, <tt>p</tt>% of reads will return a version - * within <tt>k</tt> versions of the last written; this should result - * in a latency of <tt>L</tt> ms.</i> - * <p/> - * <p/> - * These predictions should be used as a rough guideline for system - * operators. This interface is exposed through nodetool. - * <p/> - * <p/> - * The class accomplishes this by measuring latencies for reads and - * writes, then using Monte Carlo simulation to predict behavior under - * a given N,R, and W based on those latencies. - * <p/> - * <p/> - * We capture four distributions: - * <p/> - * <ul> - * <li> - * <tt>W</tt>: time from when the coordinator sends a mutation to the time - * that a replica begins to serve the new value(s) - * </li> - * <p/> - * <li> - * <tt>A</tt>: time from when a replica accepting a mutation sends an - * acknowledgment to the time the coordinator hears of it - * </li> - * <p/> - * <li> - * <tt>R</tt>: time from when the coordinator sends a read request to the time - * that the replica performs the read - * </li> - * <p/> - * <li> - * <tt>S</tt>: time from when the replica sends a read response to the time - * when the coordinator receives it - * </li> - * </ul> - * <p/> - * <tt>A</tt> and <tt>S</tt> are mostly network-bound, while W and R - * depend on both the network and local processing time. - * <p/> - * <p/> - * <b>Caveats:</b> - * Prediction is only as good as the latencies collected. Accurate - * prediction requires synchronizing clocks between replicas. We - * collect a running sample of latencies, but, if latencies change - * dramatically, predictions will be off. - * <p/> - * <p/> - * The predictions are conservative, or worst-case, meaning we may - * predict more staleness than in practice in the following ways: - * <ul> - * <li> - * We do not account for read repair. - * </li> - * <li> - * We do not account for Merkle tree exchange. - * </li> - * <li> - * Multi-version staleness is particularly conservative. - * </li> - * <li> - * We simulate non-local reads and writes. We assume that the - * coordinating Cassandra node is not itself a replica for a given key. - * </li> - * </ul> - * <p/> - * <p/> - * The predictions are optimistic in the following ways: - * <ul> - * <li> - * We do not predict the impact of node failure. - * </li> - * <li> - * We do not model hinted handoff. - * </li> - * </ul> - * - * @see org.apache.cassandra.thrift.ConsistencyLevel - * @see org.apache.cassandra.locator.AbstractReplicationStrategy - */ - -public class PBSPredictor implements PBSPredictorMBean -{ - private static final Logger logger = LoggerFactory.getLogger(PBSPredictor.class); - - public static final String MBEAN_NAME = "org.apache.cassandra.service:type=PBSPredictor"; - private static final boolean DEFAULT_DO_LOG_LATENCIES = false; - private static final int DEFAULT_MAX_LOGGED_LATENCIES = 10000; - private static final int DEFAULT_NUMBER_TRIALS_PREDICTION = 10000; - - /* - * We record a fixed size set of WARS latencies for read and - * mutation operations. We store the order in which each - * operation arrived, and use an LRU policy to evict old - * messages. - * - * This information is stored as a mapping from messageIDs to - * latencies. - */ - - /* - * Helper class which minimizes the number of HashMaps we maintain. - * For a given messageId, this class maintains the startTime of the message, - * and a queue for send times and reply times. - * - * sendLats corresponds to W and R, while replyLats is used for A and S. - */ - private class MessageLatencyCollection - { - MessageLatencyCollection(Long startTime) - { - this.startTime = startTime; - this.sendLats = new ConcurrentLinkedQueue<Long>(); - this.replyLats = new ConcurrentLinkedQueue<Long>(); - } - - void addSendLat(Long sendLat) - { - sendLats.add(sendLat); - } - - void addReplyLat(Long replyLat) - { - replyLats.add(replyLat); - } - - Collection<Long> getSendLats() - { - return sendLats; - } - - Collection<Long> getReplyLats() - { - return replyLats; - } - - Long getStartTime() - { - return startTime; - } - - Long startTime; - Collection<Long> sendLats; - Collection<Long> replyLats; - } - - // used for LRU replacement - private final Queue<Integer> writeMessageIds = new LinkedBlockingQueue<Integer>(); - private final Queue<Integer> readMessageIds = new LinkedBlockingQueue<Integer>(); - - private final Map<Integer, MessageLatencyCollection> messageIdToWriteLats = new ConcurrentHashMap<Integer, MessageLatencyCollection>(); - private final Map<Integer, MessageLatencyCollection> messageIdToReadLats = new ConcurrentHashMap<Integer, MessageLatencyCollection>(); - - private Random random; - private boolean initialized = false; - - private boolean logLatencies = DEFAULT_DO_LOG_LATENCIES; - private int maxLoggedLatencies = DEFAULT_MAX_LOGGED_LATENCIES; - private int numberTrialsPrediction = DEFAULT_NUMBER_TRIALS_PREDICTION; - - private static final PBSPredictor instance = new PBSPredictor(); - - public static PBSPredictor instance() - { - return instance; - } - - private PBSPredictor() - { - init(); - } - - public void enableConsistencyPredictionLogging() - { - logLatencies = true; - } - - public void disableConsistencyPredictionLogging() - { - logLatencies = false; - } - - public boolean isLoggingEnabled() - { - return logLatencies; - } - - public void setMaxLoggedLatenciesForConsistencyPrediction(int maxLogged) - { - maxLoggedLatencies = maxLogged; - } - - public void setNumberTrialsForConsistencyPrediction(int numTrials) - { - numberTrialsPrediction = numTrials; - } - - public void init() - { - if (!initialized) - { - random = new Random(); - - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(PBSPredictor.MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - initialized = true; - } - } - - - // used for random sampling from the latencies - private long getRandomElement(List<Long> list) - { - if (list.size() == 0) - throw new RuntimeException("Not enough data for prediction"); - return list.get(random.nextInt(list.size())); - } - - // used for calculating the average latency of a read or write operation - // given a set of simulated latencies - private float listAverage(List<Long> list) - { - long accum = 0; - for (long value : list) - accum += value; - return (float) accum / list.size(); - } - - // calculate the percentile entry of a list - private long getPercentile(List<Long> list, float percentile) - { - Collections.sort(list); - return list.get((int) (list.size() * percentile)); - } - - /* - * For our trials, sample the latency for the (replicaNumber)th - * reply for one of WARS - * if replicaNumber > the number of replicas we have data for - * (say we have data for ReplicationFactor 2 but ask for N=3) - * then we randomly sample from all response times - */ - private long getRandomLatencySample(Map<Integer, List<Long>> samples, int replicaNumber) - { - if (samples.containsKey(replicaNumber)) - { - return getRandomElement(samples.get(replicaNumber)); - } - - return getRandomElement(samples.get(samples.keySet().toArray()[random.nextInt(samples.keySet().size())])); - } - - /* - * To perform the prediction, we randomly sample from the - * collected WARS latencies, simulating writes followed by reads - * exactly t milliseconds afterwards. We count the number of - * reordered reads and writes to calculate the probability of - * staleness along with recording operation latencies. - */ - - - public PBSPredictionResult doPrediction(int n, - int r, - int w, - float timeSinceWrite, - int numberVersionsStale, - float percentileLatency) - { - if (r > n) - throw new IllegalArgumentException("r must be less than n"); - if (r < 0) - throw new IllegalArgumentException("r must be positive"); - if (w > n) - throw new IllegalArgumentException("w must be less than n"); - if (w < 0) - throw new IllegalArgumentException("w must be positive"); - if (percentileLatency < 0 || percentileLatency > 1) - throw new IllegalArgumentException("percentileLatency must be between 0 and 1 inclusive"); - if (numberVersionsStale < 0) - throw new IllegalArgumentException("numberVersionsStale must be positive"); - - if (!logLatencies) - throw new IllegalStateException("Latency logging is not enabled"); - - // get a mapping of {replica number : latency} for each of WARS - Map<Integer, List<Long>> wLatencies = getOrderedWLatencies(); - Map<Integer, List<Long>> aLatencies = getOrderedALatencies(); - Map<Integer, List<Long>> rLatencies = getOrderedRLatencies(); - Map<Integer, List<Long>> sLatencies = getOrderedSLatencies(); - - if (wLatencies.isEmpty() || aLatencies.isEmpty()) - throw new IllegalStateException("No write latencies have been recorded so far. Run some (non-local) inserts."); - - if (rLatencies.isEmpty() || sLatencies.isEmpty()) - throw new IllegalStateException("No read latencies have been recorded so far. Run some (non-local) reads."); - - // storage for simulated read and write latencies - ArrayList<Long> readLatencies = new ArrayList<Long>(); - ArrayList<Long> writeLatencies = new ArrayList<Long>(); - - long consistentReads = 0; - - // storage for latencies for each replica for a given Monte Carlo trial - // arr[i] will hold the ith replica's latency for one of WARS - ArrayList<Long> trialWLatencies = new ArrayList<Long>(); - ArrayList<Long> trialRLatencies = new ArrayList<Long>(); - - ArrayList<Long> replicaWriteLatencies = new ArrayList<Long>(); - ArrayList<Long> replicaReadLatencies = new ArrayList<Long>(); - - //run repeated trials and observe staleness - for (int i = 0; i < numberTrialsPrediction; ++i) - { - //simulate sending a write to N replicas then sending a - //read to N replicas and record the latencies by randomly - //sampling from gathered latencies - for (int replicaNo = 0; replicaNo < n; ++replicaNo) - { - long trialWLatency = getRandomLatencySample(wLatencies, replicaNo); - long trialALatency = getRandomLatencySample(aLatencies, replicaNo); - - trialWLatencies.add(trialWLatency); - - replicaWriteLatencies.add(trialWLatency + trialALatency); - } - - // reads are only sent to R replicas - so pick R random read and - // response latencies - for (int replicaNo = 0; replicaNo < r; ++replicaNo) - { - long trialRLatency = getRandomLatencySample(rLatencies, replicaNo); - long trialSLatency = getRandomLatencySample(sLatencies, replicaNo); - - trialRLatencies.add(trialRLatency); - - replicaReadLatencies.add(trialRLatency + trialSLatency); - } - - // the write latency for this trial is the time it takes - // for the wth replica to respond (W+A) - Collections.sort(replicaWriteLatencies); - long writeLatency = replicaWriteLatencies.get(w - 1); - writeLatencies.add(writeLatency); - - ArrayList<Long> sortedReplicaReadLatencies = new ArrayList<Long>(replicaReadLatencies); - Collections.sort(sortedReplicaReadLatencies); - - // the read latency for this trial is the time it takes - // for the rth replica to respond (R+S) - readLatencies.add(sortedReplicaReadLatencies.get(r - 1)); - - // were all of the read responses reordered? - - // for each of the first r messages (the ones the - // coordinator will pick from): - //--if the read message came in after this replica saw the - // write, it will be consistent - //--each read request is sent at time - // writeLatency+timeSinceWrite - - for (int responseNumber = 0; responseNumber < r; ++responseNumber) - { - int replicaNumber = replicaReadLatencies.indexOf(sortedReplicaReadLatencies.get(responseNumber)); - - if (writeLatency + timeSinceWrite + trialRLatencies.get(replicaNumber) >= - trialWLatencies.get(replicaNumber)) - { - consistentReads++; - break; - } - - // tombstone this replica in the case that we have - // duplicate read latencies - replicaReadLatencies.set(replicaNumber, -1L); - } - - // clear storage for the next trial - trialWLatencies.clear(); - trialRLatencies.clear(); - - replicaReadLatencies.clear(); - replicaWriteLatencies.clear(); - } - - float oneVersionConsistencyProbability = (float) consistentReads / numberTrialsPrediction; - - // to calculate multi-version staleness, we exponentiate the staleness probability by the number of versions - float consistencyProbability = (float) (1 - Math.pow((double) (1 - oneVersionConsistencyProbability), - numberVersionsStale)); - - float averageWriteLatency = listAverage(writeLatencies); - float averageReadLatency = listAverage(readLatencies); - - long percentileWriteLatency = getPercentile(writeLatencies, percentileLatency); - long percentileReadLatency = getPercentile(readLatencies, percentileLatency); - - return new PBSPredictionResult(n, - r, - w, - timeSinceWrite, - numberVersionsStale, - consistencyProbability, - averageReadLatency, - averageWriteLatency, - percentileReadLatency, - percentileLatency, - percentileWriteLatency, - percentileLatency); - } - - public void startWriteOperation(int id) - { - if (!logLatencies) - return; - - startWriteOperation(id, System.currentTimeMillis()); - } - - public void startWriteOperation(int id, long startTime) - { - if (!logLatencies) - return; - - assert (!messageIdToWriteLats.containsKey(id)); - - writeMessageIds.add(id); - - // LRU replacement of latencies - // the maximum number of entries is sloppy, but that's acceptable for our purposes - if (writeMessageIds.size() > maxLoggedLatencies) - { - Integer toEvict = writeMessageIds.remove(); - messageIdToWriteLats.remove(toEvict); - } - - messageIdToWriteLats.put(id, new MessageLatencyCollection(startTime)); - } - - public void startReadOperation(int id) - { - if (!logLatencies) - return; - - startReadOperation(id, System.currentTimeMillis()); - } - - public void startReadOperation(int id, long startTime) - { - if (!logLatencies) - return; - - assert (!messageIdToReadLats.containsKey(id)); - readMessageIds.add(id); - - // LRU replacement of latencies - // the maximum number of entries is sloppy, but that's acceptable for our purposes - if (readMessageIds.size() > maxLoggedLatencies) - { - Integer toEvict = readMessageIds.remove(); - messageIdToReadLats.remove(toEvict); - } - - messageIdToReadLats.put(id, new MessageLatencyCollection(startTime)); - } - - public void logWriteResponse(int id, long constructionTime) - { - if (!logLatencies) - return; - - logWriteResponse(id, constructionTime, System.currentTimeMillis()); - } - - public void logWriteResponse(int id, long responseCreationTime, long receivedTime) - { - if (!logLatencies) - return; - - MessageLatencyCollection writeLatsCollection = messageIdToWriteLats.get(id); - if (writeLatsCollection == null) - { - return; - } - - Long startTime = writeLatsCollection.getStartTime(); - writeLatsCollection.addSendLat(Math.max(0, responseCreationTime - startTime)); - writeLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime)); - } - - public void logReadResponse(int id, long constructionTime) - { - if (!logLatencies) - return; - - logReadResponse(id, constructionTime, System.currentTimeMillis()); - } - - public void logReadResponse(int id, long responseCreationTime, long receivedTime) - { - if (!logLatencies) - return; - - MessageLatencyCollection readLatsCollection = messageIdToReadLats.get(id); - if (readLatsCollection == null) - { - return; - } - - Long startTime = readLatsCollection.getStartTime(); - readLatsCollection.addSendLat(Math.max(0, responseCreationTime - startTime)); - readLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime)); - } - - Map<Integer, List<Long>> getOrderedWLatencies() - { - Collection<Collection<Long>> allWLatencies = new ArrayList<Collection<Long>>(); - for (MessageLatencyCollection wlc : messageIdToWriteLats.values()) - { - allWLatencies.add(wlc.getSendLats()); - } - - return getOrderedLatencies(allWLatencies); - } - - Map<Integer, List<Long>> getOrderedALatencies() - { - Collection<Collection<Long>> allALatencies = new ArrayList<Collection<Long>>(); - for (MessageLatencyCollection wlc : messageIdToWriteLats.values()) - allALatencies.add(wlc.getReplyLats()); - return getOrderedLatencies(allALatencies); - } - - Map<Integer, List<Long>> getOrderedRLatencies() - { - Collection<Collection<Long>> allRLatencies = new ArrayList<Collection<Long>>(); - for (MessageLatencyCollection rlc : messageIdToReadLats.values()) - { - allRLatencies.add(rlc.getSendLats()); - } - return getOrderedLatencies(allRLatencies); - } - - Map<Integer, List<Long>> getOrderedSLatencies() - { - Collection<Collection<Long>> allSLatencies = new ArrayList<Collection<Long>>(); - for (MessageLatencyCollection rlc : messageIdToReadLats.values()) - allSLatencies.add(rlc.getReplyLats()); - return getOrderedLatencies(allSLatencies); - } - - // Return the collected latencies indexed by response number instead of by messageID - private Map<Integer, List<Long>> getOrderedLatencies(Collection<Collection<Long>> latencyLists) - { - Map<Integer, List<Long>> ret = new HashMap<Integer, List<Long>>(); - - // N may vary - int maxResponses = 0; - - for (Collection<Long> latencies : latencyLists) - { - List<Long> sortedLatencies = new ArrayList<Long>(latencies); - Collections.sort(sortedLatencies); - - if (sortedLatencies.size() > maxResponses) - { - for (int i = maxResponses + 1; i <= sortedLatencies.size(); ++i) - { - ret.put(i, new Vector<Long>()); - } - - maxResponses = sortedLatencies.size(); - } - - // indexing by 0 is awkward since we're talking about the ith response - for (int i = 1; i <= sortedLatencies.size(); ++i) - { - ret.get(i).add(sortedLatencies.get(i - 1)); - } - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/service/PBSPredictorMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PBSPredictorMBean.java b/src/java/org/apache/cassandra/service/PBSPredictorMBean.java deleted file mode 100644 index 1e2f31a..0000000 --- a/src/java/org/apache/cassandra/service/PBSPredictorMBean.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service; - -public interface PBSPredictorMBean -{ - public PBSPredictionResult doPrediction(int n, - int r, - int w, - float timeSinceWrite, - int numberVersionsStale, - float percentileLatency); - - public void enableConsistencyPredictionLogging(); - public void disableConsistencyPredictionLogging(); - - public void setMaxLoggedLatenciesForConsistencyPrediction(int maxLogged); - public void setNumberTrialsForConsistencyPrediction(int numTrials); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 855c69a..c1acb44 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -473,8 +473,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new AssertionError(e); } - PBSPredictor.instance().init(); - if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) { logger.info("Loading persisted ring state"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 2140358..6e52f69 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -47,8 +47,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; import org.apache.cassandra.net.MessagingServiceMBean; import org.apache.cassandra.service.CacheServiceMBean; -import org.apache.cassandra.service.PBSPredictionResult; -import org.apache.cassandra.service.PBSPredictorMBean; import org.apache.cassandra.service.StorageProxyMBean; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.Pair; @@ -157,7 +155,6 @@ public class NodeCmd RANGEKEYSAMPLE, REBUILD_INDEX, RESETLOCALSCHEMA, - PREDICTCONSISTENCY } @@ -925,48 +922,6 @@ public class NodeCmd outs.println(probe.isThriftServerRunning() ? "running" : "not running"); } - public void predictConsistency(Integer replicationFactor, - Integer timeAfterWrite, - Integer numVersions, - Float percentileLatency, - PrintStream output) - { - PBSPredictorMBean predictorMBean = probe.getPBSPredictorMBean(); - - for(int r = 1; r <= replicationFactor; ++r) { - for(int w = 1; w <= replicationFactor; ++w) { - if(w+r > replicationFactor+1) - continue; - - try { - PBSPredictionResult result = predictorMBean.doPrediction(replicationFactor, - r, - w, - timeAfterWrite, - numVersions, - percentileLatency); - - if(r == 1 && w == 1) { - output.printf("%dms after a given write, with maximum version staleness of k=%d%n", timeAfterWrite, numVersions); - } - - output.printf("N=%d, R=%d, W=%d%n", replicationFactor, r, w); - output.printf("Probability of consistent reads: %f%n", result.getConsistencyProbability()); - output.printf("Average read latency: %fms (%.3fth %%ile %dms)%n", result.getAverageReadLatency(), - result.getPercentileReadLatencyPercentile()*100, - result.getPercentileReadLatencyValue()); - output.printf("Average write latency: %fms (%.3fth %%ile %dms)%n%n", result.getAverageWriteLatency(), - result.getPercentileWriteLatencyPercentile()*100, - result.getPercentileWriteLatencyValue()); - } catch (Exception e) { - System.out.println(e.getMessage()); - e.printStackTrace(); - return; - } - } - } - } - public static void main(String[] args) throws IOException, InterruptedException, ConfigurationException, ParseException { CommandLineParser parser = new PosixParser(); @@ -1244,20 +1199,6 @@ public class NodeCmd nodeCmd.printRangeKeySample(System.out); break; - case PREDICTCONSISTENCY: - if (arguments.length < 2) { badUse("Requires replication factor and time"); } - int numVersions = 1; - if (arguments.length == 3) { numVersions = Integer.parseInt(arguments[2]); } - float percentileLatency = .999f; - if (arguments.length == 4) { percentileLatency = Float.parseFloat(arguments[3]); } - - nodeCmd.predictConsistency(Integer.parseInt(arguments[0]), - Integer.parseInt(arguments[1]), - numVersions, - percentileLatency, - System.out); - break; - default : throw new RuntimeException("Unreachable code."); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index fb941f5..753a64c 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -77,7 +77,6 @@ public class NodeProbe public MessagingServiceMBean msProxy; private FailureDetectorMBean fdProxy; private CacheServiceMBean cacheService; - private PBSPredictorMBean PBSPredictorProxy; private StorageProxyMBean spProxy; private HintedHandOffManagerMBean hhProxy; private boolean failed; @@ -149,8 +148,6 @@ public class NodeProbe { ObjectName name = new ObjectName(ssObjName); ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); - name = new ObjectName(PBSPredictor.MBEAN_NAME); - PBSPredictorProxy = JMX.newMBeanProxy(mbeanServerConn, name, PBSPredictorMBean.class); name = new ObjectName(MessagingService.MBEAN_NAME); msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class); name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME); @@ -815,11 +812,6 @@ public class NodeProbe return ssProxy.describeRingJMX(keyspaceName); } - public PBSPredictorMBean getPBSPredictorMBean() - { - return PBSPredictorProxy; - } - public void rebuild(String sourceDc) { ssProxy.rebuild(sourceDc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c7141ee/test/unit/org/apache/cassandra/service/PBSPredictorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java deleted file mode 100644 index a0e2ef0..0000000 --- a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.service; - -import org.junit.Test; -import static org.junit.Assert.*; - -public class PBSPredictorTest -{ - private static PBSPredictor predictor = PBSPredictor.instance(); - - private void createWriteResponse(long W, long A, int id) - { - predictor.startWriteOperation(id, 0); - predictor.logWriteResponse(id, W, W+A); - } - - private void createReadResponse(long R, long S, int id) - { - predictor.startReadOperation(id, 0); - predictor.logReadResponse(id, R, R+S); - } - - @Test - public void testDoPrediction() - { - try { - predictor.enableConsistencyPredictionLogging(); - predictor.init(); - - /* - Ensure accuracy given a set of basic latencies - Predictions here match a prior Python implementation - */ - - for (int i = 0; i < 10; ++i) - { - createWriteResponse(10, 0, 10 * i); - createReadResponse(0, 0, 10 * i + 1); - } - - for (int i = 0; i < 10; ++i) - { - createWriteResponse(0, 0, 10 * i + 2); - } - - // 10ms after write - PBSPredictionResult result = predictor.doPrediction(2,1,1,10.0f,1, 0.99f); - - assertEquals(1, result.getConsistencyProbability(), 0); - assertEquals(2.5, result.getAverageWriteLatency(), .5); - - // 0ms after write - result = predictor.doPrediction(2,1,1,0f,1, 0.99f); - - assertEquals(.75, result.getConsistencyProbability(), 0.05); - - // k=5 versions staleness - result = predictor.doPrediction(2,1,1,5.0f,5, 0.99f); - assertEquals(.98, result.getConsistencyProbability(), 0.05); - assertEquals(2.5, result.getAverageWriteLatency(), .5); - - for (int i = 0; i < 10; ++i) - { - createWriteResponse(20, 0, 10 * i + 3); - } - - // 5ms after write - result = predictor.doPrediction(2,1,1,5.0f,1, 0.99f); - - assertEquals(.67, result.getConsistencyProbability(), .05); - - // N = 5 - result = predictor.doPrediction(5,1,1,5.0f,1, 0.99f); - - assertEquals(.42, result.getConsistencyProbability(), .05); - assertEquals(1.33, result.getAverageWriteLatency(), .5); - - for (int i = 0; i < 10; ++i) - { - createWriteResponse(100, 100, 10 * i + 4); - createReadResponse(100, 100, 10 * i + 5); - } - - result = predictor.doPrediction(2,1,1,0f,1, 0.99f); - - assertEquals(.860, result.getConsistencyProbability(), .05); - assertEquals(26.5, result.getAverageWriteLatency(), 1); - assertEquals(100.33, result.getAverageReadLatency(), 4); - - result = predictor.doPrediction(2,2,1,0f,1, 0.99f); - - assertEquals(1, result.getConsistencyProbability(), 0); - } catch (Exception e) { - fail(e.getMessage()); - } - } -}
