Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Jul 30 15:30:21 2009
@@ -1,686 +1,686 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.lang.management.ManagementFactory;
-
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.TimedStatsDeque;
-import org.apache.log4j.Logger;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-
-public class StorageProxy implements StorageProxyMBean
-{
-    private static Logger logger = Logger.getLogger(StorageProxy.class);
-
-    // mbean stuff
-    private static TimedStatsDeque readStats = new TimedStatsDeque(60000);
-    private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000);
-    private static TimedStatsDeque writeStats = new TimedStatsDeque(60000);
-    private StorageProxy() {}
-    static
-    {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(new StorageProxy(), new 
ObjectName("org.apache.cassandra.service:type=StorageProxy"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * This method is responsible for creating Message to be
-     * sent over the wire to N replicas where some of the replicas
-     * may be hints.
-     */
-    private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, 
Map<EndPoint, EndPoint> endpointMap) throws IOException
-    {
-               Map<EndPoint, Message> messageMap = new HashMap<EndPoint, 
Message>();
-               Message message = rm.makeRowMutationMessage();
-
-               for (Map.Entry<EndPoint, EndPoint> entry : 
endpointMap.entrySet())
-               {
-            EndPoint target = entry.getKey();
-            EndPoint hint = entry.getValue();
-            if ( !target.equals(hint) )
-                       {
-                               Message hintedMessage = 
rm.makeRowMutationMessage();
-                               hintedMessage.addHeader(RowMutation.HINT, 
EndPoint.toBytes(hint) );
-                               if (logger.isDebugEnabled())
-                                   logger.debug("Sending the hint of " + 
hint.getHost() + " to " + target.getHost());
-                               messageMap.put(target, hintedMessage);
-                       }
-                       else
-                       {
-                               messageMap.put(target, message);
-                       }
-               }
-               return messageMap;
-    }
-    
-    /**
-     * Use this method to have this RowMutation applied
-     * across all replicas. This method will take care
-     * of the possibility of a replica being down and hint
-     * the data across to some other replica. 
-     * @param rm the mutation to be applied across the replicas
-    */
-    public static void insert(RowMutation rm)
-       {
-        /*
-         * Get the N nodes from storage service where the data needs to be
-         * replicated
-         * Construct a message for write
-         * Send them asynchronously to the replicas.
-        */
-
-        long startTime = System.currentTimeMillis();
-               try
-               {
-                       Map<EndPoint, EndPoint> endpointMap = 
StorageService.instance().getNStorageEndPointMap(rm.key());
-                       // TODO: throw a thrift exception if we do not have N 
nodes
-                       Map<EndPoint, Message> messageMap = 
createWriteMessages(rm, endpointMap);
-                       for (Map.Entry<EndPoint, Message> entry : 
messageMap.entrySet())
-                       {
-                Message message = entry.getValue();
-                EndPoint endpoint = entry.getKey();
-                // Check if local and not hinted
-                byte[] hintedBytes = message.getHeader(RowMutation.HINT);
-                if (endpoint.equals(StorageService.getLocalStorageEndPoint())
-                        && !(hintedBytes!= null && hintedBytes.length>0))
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug("locally writing writing key " + rm.key()
-                                + " to " + endpoint);
-                    rm.apply();
-                } else
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug("insert writing key " + rm.key() + " to "
-                                + message.getMessageId() + "@" + endpoint);
-                       
MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
-                }
-                       }
-               }
-        catch (IOException e)
-        {
-            throw new RuntimeException("error inserting key " + rm.key(), e);
-        }
-        finally
-        {
-            writeStats.add(System.currentTimeMillis() - startTime);
-        }
-    }
-    
-    public static void insertBlocking(RowMutation rm, int consistency_level) 
throws UnavailableException
-    {
-        long startTime = System.currentTimeMillis();
-        Message message = null;
-        try
-        {
-            message = rm.makeRowMutationMessage();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        try
-        {
-            EndPoint[] endpoints = 
StorageService.instance().getNStorageEndPoint(rm.key());
-            if (endpoints.length < (DatabaseDescriptor.getReplicationFactor() 
/ 2) + 1)
-            {
-                throw new UnavailableException();
-            }
-            int blockFor;
-            if (consistency_level == ConsistencyLevel.ONE)
-            {
-                blockFor = 1;
-            }
-            else if (consistency_level == ConsistencyLevel.QUORUM)
-            {
-                blockFor = (DatabaseDescriptor.getReplicationFactor() >> 1) + 
1;
-            }
-            else if (consistency_level == ConsistencyLevel.ALL)
-            {
-                blockFor = DatabaseDescriptor.getReplicationFactor();
-            }
-            else
-            {
-                throw new UnsupportedOperationException("invalid consistency 
level " + consistency_level);
-            }
-            QuorumResponseHandler<Boolean> quorumResponseHandler = new 
QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
-            if (logger.isDebugEnabled())
-                logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
-
-            MessagingService.getMessagingInstance().sendRR(message, endpoints, 
quorumResponseHandler);
-            if (!quorumResponseHandler.get())
-                throw new UnavailableException();
-        }
-        catch (Exception e)
-        {
-            logger.error("error writing key " + rm.key(), e);
-            throw new UnavailableException();
-        }
-        finally
-        {
-            writeStats.add(System.currentTimeMillis() - startTime);
-        }
-    }
-
-    public static void insertBlocking(RowMutation rm) throws 
UnavailableException
-    {
-        insertBlocking(rm, ConsistencyLevel.QUORUM);
-    }
-    
-    private static Map<String, Message> constructMessages(Map<String, 
ReadCommand> readMessages) throws IOException
-    {
-        Map<String, Message> messages = new HashMap<String, Message>();
-        Set<String> keys = readMessages.keySet();        
-        for ( String key : keys )
-        {
-            Message message = readMessages.get(key).makeReadMessage();
-            messages.put(key, message);
-        }        
-        return messages;
-    }
-    
-    private static IAsyncResult dispatchMessages(Map<String, EndPoint> 
endPoints, Map<String, Message> messages)
-    {
-        Set<String> keys = endPoints.keySet();
-        EndPoint[] eps = new EndPoint[keys.size()];
-        Message[] msgs  = new Message[keys.size()];
-        
-        int i = 0;
-        for ( String key : keys )
-        {
-            eps[i] = endPoints.get(key);
-            msgs[i] = messages.get(key);
-            ++i;
-        }
-        
-        IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(msgs, eps);
-        return iar;
-    }
-    
-    /**
-     * This is an implementation for the multiget version. 
-     * @param readMessages map of key --> ReadMessage to be sent
-     * @return map of key --> Row
-     * @throws IOException
-     * @throws TimeoutException
-     */
-    public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> 
readMessages) throws IOException,TimeoutException
-    {
-        Map<String, Row> rows = new HashMap<String, Row>();
-        Set<String> keys = readMessages.keySet();
-        /* Find all the suitable endpoints for the keys */
-        Map<String, EndPoint> endPoints = 
StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] ));
-        /* Construct the messages to be sent out */
-        Map<String, Message> messages = constructMessages(readMessages);
-        /* Dispatch the messages to the respective endpoints */
-        IAsyncResult iar = dispatchMessages(endPoints, messages);        
-        List<byte[]> results = 
iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-        
-        for ( byte[] body : results )
-        {
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            ReadResponse response = 
ReadResponse.serializer().deserialize(bufIn);
-            Row row = response.row();
-            rows.put(row.key(), row);
-        }        
-        return rows;
-    }
-
-    /**
-     * Read the data from one replica.  If there is no reply, read the data 
from another.  In the event we get
-     * the data we perform consistency checks and figure out if any repairs 
need to be done to the replicas.
-     * @param command the read to perform
-     * @return the row associated with command.key
-     * @throws Exception
-     */
-    private static Row weakReadRemote(ReadCommand command) throws IOException
-    {
-        EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
-        assert endPoint != null;
-        Message message = command.makeReadMessage();
-        if (logger.isDebugEnabled())
-            logger.debug("weakreadremote reading " + command + " from " + 
message.getMessageId() + "@" + endPoint);
-        message.addHeader(ReadCommand.DO_REPAIR, 
ReadCommand.DO_REPAIR.getBytes());
-        IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(message, endPoint);
-        byte[] body;
-        try
-        {
-            body = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
-        }
-        catch (TimeoutException e)
-        {
-            throw new RuntimeException("error reading key " + command.key, e);
-            // TODO retry to a different endpoint?
-        }
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(body, body.length);
-        ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
-        return response.row();
-    }
-
-    /**
-     * Performs the actual reading of a row out of the StorageService, fetching
-     * a specific set of column names from a given column family.
-     */
-    public static Row readProtocol(ReadCommand command, int consistency_level)
-    throws IOException, TimeoutException, InvalidRequestException
-    {
-        long startTime = System.currentTimeMillis();
-
-        Row row;
-        EndPoint[] endpoints = 
StorageService.instance().getNStorageEndPoint(command.key);
-
-        if (consistency_level == ConsistencyLevel.ONE)
-        {
-            boolean foundLocal = 
Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
-            if (foundLocal)
-            {
-                row = weakReadLocal(command);
-            }
-            else
-            {
-                row = weakReadRemote(command);
-            }
-        }
-        else
-        {
-            assert consistency_level == ConsistencyLevel.QUORUM;
-            row = strongRead(command);
-        }
-
-        readStats.add(System.currentTimeMillis() - startTime);
-
-        return row;
-    }
-
-    public static Map<String, Row> readProtocol(String[] keys, ReadCommand 
readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
-    {
-        Map<String, Row> rows = new HashMap<String, Row>();        
-        switch ( consistencyLevel )
-        {
-            case WEAK:
-                rows = weakReadProtocol(keys, readCommand);
-                break;
-                
-            case STRONG:
-                rows = strongReadProtocol(keys, readCommand);
-                break;
-                
-            default:
-                rows = weakReadProtocol(keys, readCommand);
-                break;
-        }
-        return rows;
-    }
-
-    /**
-     * This is a multiget version of the above method.
-     * @param tablename
-     * @param keys
-     * @param columnFamily
-     * @param start
-     * @param count
-     * @return
-     * @throws IOException
-     * @throws TimeoutException
-     */
-    public static Map<String, Row> strongReadProtocol(String[] keys, 
ReadCommand readCommand) throws IOException, TimeoutException
-    {       
-        Map<String, Row> rows;
-        // TODO: throw a thrift exception if we do not have N nodes
-        Map<String, ReadCommand[]> readMessages = new HashMap<String, 
ReadCommand[]>();
-        for (String key : keys )
-        {
-            ReadCommand[] readParameters = new ReadCommand[2];
-            readParameters[0] = readCommand.copy();
-            readParameters[1] = readCommand.copy();
-            readParameters[1].setDigestQuery(true);
-            readMessages.put(key, readParameters);
-        }        
-        rows = doStrongReadProtocol(readMessages);         
-        return rows;
-    }
-
-    /*
-     * This function executes the read protocol.
-        // 1. Get the N nodes from storage service where the data needs to be
-        // replicated
-        // 2. Construct a message for read\write
-         * 3. Set one of the messages to get the data and the rest to get the 
digest
-        // 4. SendRR ( to all the nodes above )
-        // 5. Wait for a response from at least X nodes where X <= N and the 
data node
-         * 6. If the digest matches return the data.
-         * 7. else carry out read repair by getting data from all the nodes.
-        // 5. return success
-     */
-    private static Row strongRead(ReadCommand command) throws IOException, 
TimeoutException, InvalidRequestException
-    {
-        // TODO: throw a thrift exception if we do not have N nodes
-        assert !command.isDigestQuery();
-        ReadCommand readMessageDigestOnly = command.copy();
-        readMessageDigestOnly.setDigestQuery(true);
-
-        Row row = null;
-        Message message = command.makeReadMessage();
-        Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
-
-        IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver();
-        QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(
-                DatabaseDescriptor.getQuorum(),
-                readResponseResolver);
-        EndPoint dataPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
-        List<EndPoint> endpointList = new 
ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
-        /* Remove the local storage endpoint from the list. */
-        endpointList.remove(dataPoint);
-        EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
-        Message messages[] = new Message[endpointList.size() + 1];
-
-        /*
-         * First message is sent to the node that will actually get
-         * the data for us. The other two replicas are only sent a
-         * digest query.
-        */
-        endPoints[0] = dataPoint;
-        messages[0] = message;
-        if (logger.isDebugEnabled())
-            logger.debug("strongread reading data for " + command + " from " + 
message.getMessageId() + "@" + dataPoint);
-        for (int i = 1; i < endPoints.length; i++)
-        {
-            EndPoint digestPoint = endpointList.get(i - 1);
-            endPoints[i] = digestPoint;
-            messages[i] = messageDigestOnly;
-            if (logger.isDebugEnabled())
-                logger.debug("strongread reading digest for " + command + " 
from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
-        }
-
-        try
-        {
-            MessagingService.getMessagingInstance().sendRR(messages, 
endPoints, quorumResponseHandler);
-
-            long startTime2 = System.currentTimeMillis();
-            row = quorumResponseHandler.get();
-            if (logger.isDebugEnabled())
-                logger.debug("quorumResponseHandler: " + 
(System.currentTimeMillis() - startTime2) + " ms.");
-        }
-        catch (DigestMismatchException ex)
-        {
-            if ( DatabaseDescriptor.getConsistencyCheck())
-            {
-                IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver();
-                QuorumResponseHandler<Row> quorumResponseHandlerRepair = new 
QuorumResponseHandler<Row>(
-                        DatabaseDescriptor.getQuorum(),
-                        readResponseResolverRepair);
-                logger.info("DigestMismatchException: " + command.key);
-                Message messageRepair = command.makeReadMessage();
-                MessagingService.getMessagingInstance().sendRR(messageRepair, 
endPoints,
-                                                               
quorumResponseHandlerRepair);
-                try
-                {
-                    row = quorumResponseHandlerRepair.get();
-                }
-                catch (DigestMismatchException e)
-                {
-                    // TODO should this be a thrift exception?
-                    throw new RuntimeException("digest mismatch reading key " 
+ command.key, e);
-                }
-            }
-        }
-
-        return row;
-    }
-
-    private static Map<String, Message[]> constructReplicaMessages(Map<String, 
ReadCommand[]> readMessages) throws IOException
-    {
-        Map<String, Message[]> messages = new HashMap<String, Message[]>();
-        Set<String> keys = readMessages.keySet();
-        
-        for ( String key : keys )
-        {
-            Message[] msg = new 
Message[DatabaseDescriptor.getReplicationFactor()];
-            ReadCommand[] readParameters = readMessages.get(key);
-            msg[0] = readParameters[0].makeReadMessage();
-            for ( int i = 1; i < msg.length; ++i )
-            {
-                msg[i] = readParameters[1].makeReadMessage();
-            }
-        }        
-        return messages;
-    }
-    
-    private static MultiQuorumResponseHandler dispatchMessages(Map<String, 
ReadCommand[]> readMessages, Map<String, Message[]> messages) throws IOException
-    {
-        Set<String> keys = messages.keySet();
-        /* This maps the keys to the original data read messages */
-        Map<String, ReadCommand> readMessage = new HashMap<String, 
ReadCommand>();
-        /* This maps the keys to their respective endpoints/replicas */
-        Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
-        /* Groups the messages that need to be sent to the individual keys */
-        Message[][] msgList = new 
Message[messages.size()][DatabaseDescriptor.getReplicationFactor()];
-        /* Respects the above grouping and provides the endpoints for the 
above messages */
-        EndPoint[][] epList = new 
EndPoint[messages.size()][DatabaseDescriptor.getReplicationFactor()];
-        
-        int i = 0;
-        for ( String key : keys )
-        {
-            /* This is the primary */
-            EndPoint dataPoint = 
StorageService.instance().findSuitableEndPoint(key);
-            List<EndPoint> replicas = new ArrayList<EndPoint>( 
StorageService.instance().getNLiveStorageEndPoint(key) );
-            replicas.remove(dataPoint);
-            /* Get the messages to be sent index 0 is the data messages and 
index 1 is the digest message */
-            Message[] message = messages.get(key);           
-            msgList[i][0] = message[0];
-            int N = DatabaseDescriptor.getReplicationFactor();
-            for ( int j = 1; j < N; ++j )
-            {
-                msgList[i][j] = message[1];
-            }
-            /* Get the endpoints to which the above messages need to be sent */
-            epList[i][0] = dataPoint;
-            for ( int j = 1; i < N; ++i )
-            {                
-                epList[i][j] = replicas.get(j - 1);
-            } 
-            /* Data ReadMessage associated with this key */
-            readMessage.put( key, readMessages.get(key)[0] );
-            /* EndPoints for this specific key */
-            endpoints.put(key, epList[i]);
-            ++i;
-        }
-                
-        /* Handles the read semantics for this entire set of keys */
-        MultiQuorumResponseHandler quorumResponseHandlers = new 
MultiQuorumResponseHandler(readMessage, endpoints);
-        MessagingService.getMessagingInstance().sendRR(msgList, epList, 
quorumResponseHandlers);
-        return quorumResponseHandlers;
-    }
-    
-    /**
-    *  This method performs the read from the replicas for a bunch of keys.
-    *  @param readMessages map of key --> readMessage[] of two entries where 
-    *         the first entry is the readMessage for the data and the second
-    *         is the entry for the digest 
-    *  @return map containing key ---> Row
-    *  @throws IOException, TimeoutException
-   */
-    private static Map<String, Row> doStrongReadProtocol(Map<String, 
ReadCommand[]> readMessages) throws IOException
-    {        
-        Map<String, Row> rows = new HashMap<String, Row>();
-        /* Construct the messages to be sent to the replicas */
-        Map<String, Message[]> replicaMessages = 
constructReplicaMessages(readMessages);
-        /* Dispatch the messages to the different replicas */
-        MultiQuorumResponseHandler cb = dispatchMessages(readMessages, 
replicaMessages);
-        try
-        {
-            Row[] rows2 = cb.get();
-            for ( Row row : rows2 )
-            {
-                rows.put(row.key(), row);
-            }
-        }
-        catch (TimeoutException e)
-        {
-            throw new RuntimeException("timeout reading keys " + 
StringUtils.join(rows.keySet(), ", "), e);
-        }
-        return rows;
-    }
-
-    /**
-     * This version is used when results for multiple keys needs to be
-     * retrieved.
-     * 
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param columns the columns we are interested in
-     * @return a mapping of key --> Row
-     * @throws Exception
-     */
-    public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand 
readCommand) throws Exception
-    {
-        Row row = null;
-        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
-        for ( String key : keys )
-        {
-            ReadCommand readCmd = readCommand.copy();
-            readMessages.put(key, readCmd);
-        }
-        /* Performs the multiget in parallel */
-        Map<String, Row> rows = doReadProtocol(readMessages);
-        /*
-         * Do the consistency checks for the keys that are being queried
-         * in the background.
-        */
-        for ( String key : keys )
-        {
-            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            if ( endpoints.size() > 0 && 
DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, 
readMessages.get(key));
-        }
-        return rows;
-    }
-
-    /*
-    * This function executes the read protocol locally and should be used only 
if consistency is not a concern.
-    * Read the data from the local disk and return if the row is NOT NULL. If 
the data is NULL do the read from
-    * one of the other replicas (in the same data center if possible) till we 
get the data. In the event we get
-    * the data we perform consistency checks and figure out if any repairs 
need to be done to the replicas.
-    */
-    private static Row weakReadLocal(ReadCommand command) throws IOException
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("weakreadlocal reading " + command);
-        List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(command.key);
-        /* Remove the local storage endpoint from the list. */
-        endpoints.remove(StorageService.getLocalStorageEndPoint());
-        // TODO: throw a thrift exception if we do not have N nodes
-
-        Table table = Table.open(command.table);
-        Row row = command.getRow(table);
-
-        /*
-           * Do the consistency checks in the background and return the
-           * non NULL row.
-           */
-        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-            StorageService.instance().doConsistencyCheck(row, endpoints, 
command);
-        return row;
-    }
-
-    static List<String> getKeyRange(RangeCommand command)
-    {
-        long startTime = System.currentTimeMillis();
-        try
-        {
-            EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith);
-            IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(command.getMessage(), endPoint);
-
-            // read response
-            // TODO send more requests if we need to span multiple nodes
-            byte[] responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
-            return RangeReply.read(responseBody).keys;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("error reading keyrange " + command, e);
-        }
-        finally
-        {
-            rangeStats.add(System.currentTimeMillis() - startTime);
-        }
-    }
-
-    public double getReadLatency()
-    {
-        return readStats.mean();
-    }
-
-    public double getRangeLatency()
-    {
-        return rangeStats.mean();
-    }
-
-    public double getWriteLatency()
-    {
-        return writeStats.mean();
-    }
-
-    public int getReadOperations()
-    {
-        return readStats.size();
-    }
-
-    public int getRangeOperations()
-    {
-        return rangeStats.size();
-    }
-
-    public int getWriteOperations()
-    {
-        return writeStats.size();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.lang.management.ManagementFactory;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+
+public class StorageProxy implements StorageProxyMBean
+{
+    private static Logger logger = Logger.getLogger(StorageProxy.class);
+
+    // mbean stuff
+    private static TimedStatsDeque readStats = new TimedStatsDeque(60000);
+    private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000);
+    private static TimedStatsDeque writeStats = new TimedStatsDeque(60000);
+    private StorageProxy() {}
+    static
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(new StorageProxy(), new 
ObjectName("org.apache.cassandra.service:type=StorageProxy"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * This method is responsible for creating Message to be
+     * sent over the wire to N replicas where some of the replicas
+     * may be hints.
+     */
+    private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, 
Map<EndPoint, EndPoint> endpointMap) throws IOException
+    {
+               Map<EndPoint, Message> messageMap = new HashMap<EndPoint, 
Message>();
+               Message message = rm.makeRowMutationMessage();
+
+               for (Map.Entry<EndPoint, EndPoint> entry : 
endpointMap.entrySet())
+               {
+            EndPoint target = entry.getKey();
+            EndPoint hint = entry.getValue();
+            if ( !target.equals(hint) )
+                       {
+                               Message hintedMessage = 
rm.makeRowMutationMessage();
+                               hintedMessage.addHeader(RowMutation.HINT, 
EndPoint.toBytes(hint) );
+                               if (logger.isDebugEnabled())
+                                   logger.debug("Sending the hint of " + 
hint.getHost() + " to " + target.getHost());
+                               messageMap.put(target, hintedMessage);
+                       }
+                       else
+                       {
+                               messageMap.put(target, message);
+                       }
+               }
+               return messageMap;
+    }
+    
+    /**
+     * Use this method to have this RowMutation applied
+     * across all replicas. This method will take care
+     * of the possibility of a replica being down and hint
+     * the data across to some other replica. 
+     * @param rm the mutation to be applied across the replicas
+    */
+    public static void insert(RowMutation rm)
+       {
+        /*
+         * Get the N nodes from storage service where the data needs to be
+         * replicated
+         * Construct a message for write
+         * Send them asynchronously to the replicas.
+        */
+
+        long startTime = System.currentTimeMillis();
+               try
+               {
+                       Map<EndPoint, EndPoint> endpointMap = 
StorageService.instance().getNStorageEndPointMap(rm.key());
+                       // TODO: throw a thrift exception if we do not have N 
nodes
+                       Map<EndPoint, Message> messageMap = 
createWriteMessages(rm, endpointMap);
+                       for (Map.Entry<EndPoint, Message> entry : 
messageMap.entrySet())
+                       {
+                Message message = entry.getValue();
+                EndPoint endpoint = entry.getKey();
+                // Check if local and not hinted
+                byte[] hintedBytes = message.getHeader(RowMutation.HINT);
+                if (endpoint.equals(StorageService.getLocalStorageEndPoint())
+                        && !(hintedBytes!= null && hintedBytes.length>0))
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("locally writing writing key " + rm.key()
+                                + " to " + endpoint);
+                    rm.apply();
+                } else
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("insert writing key " + rm.key() + " to "
+                                + message.getMessageId() + "@" + endpoint);
+                       
MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
+                }
+                       }
+               }
+        catch (IOException e)
+        {
+            throw new RuntimeException("error inserting key " + rm.key(), e);
+        }
+        finally
+        {
+            writeStats.add(System.currentTimeMillis() - startTime);
+        }
+    }
+    
+    public static void insertBlocking(RowMutation rm, int consistency_level) 
throws UnavailableException
+    {
+        long startTime = System.currentTimeMillis();
+        Message message = null;
+        try
+        {
+            message = rm.makeRowMutationMessage();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        try
+        {
+            EndPoint[] endpoints = 
StorageService.instance().getNStorageEndPoint(rm.key());
+            if (endpoints.length < (DatabaseDescriptor.getReplicationFactor() 
/ 2) + 1)
+            {
+                throw new UnavailableException();
+            }
+            int blockFor;
+            if (consistency_level == ConsistencyLevel.ONE)
+            {
+                blockFor = 1;
+            }
+            else if (consistency_level == ConsistencyLevel.QUORUM)
+            {
+                blockFor = (DatabaseDescriptor.getReplicationFactor() >> 1) + 
1;
+            }
+            else if (consistency_level == ConsistencyLevel.ALL)
+            {
+                blockFor = DatabaseDescriptor.getReplicationFactor();
+            }
+            else
+            {
+                throw new UnsupportedOperationException("invalid consistency 
level " + consistency_level);
+            }
+            QuorumResponseHandler<Boolean> quorumResponseHandler = new 
QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
+            if (logger.isDebugEnabled())
+                logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
+
+            MessagingService.getMessagingInstance().sendRR(message, endpoints, 
quorumResponseHandler);
+            if (!quorumResponseHandler.get())
+                throw new UnavailableException();
+        }
+        catch (Exception e)
+        {
+            logger.error("error writing key " + rm.key(), e);
+            throw new UnavailableException();
+        }
+        finally
+        {
+            writeStats.add(System.currentTimeMillis() - startTime);
+        }
+    }
+
+    public static void insertBlocking(RowMutation rm) throws 
UnavailableException
+    {
+        insertBlocking(rm, ConsistencyLevel.QUORUM);
+    }
+    
+    private static Map<String, Message> constructMessages(Map<String, 
ReadCommand> readMessages) throws IOException
+    {
+        Map<String, Message> messages = new HashMap<String, Message>();
+        Set<String> keys = readMessages.keySet();        
+        for ( String key : keys )
+        {
+            Message message = readMessages.get(key).makeReadMessage();
+            messages.put(key, message);
+        }        
+        return messages;
+    }
+    
+    private static IAsyncResult dispatchMessages(Map<String, EndPoint> 
endPoints, Map<String, Message> messages)
+    {
+        Set<String> keys = endPoints.keySet();
+        EndPoint[] eps = new EndPoint[keys.size()];
+        Message[] msgs  = new Message[keys.size()];
+        
+        int i = 0;
+        for ( String key : keys )
+        {
+            eps[i] = endPoints.get(key);
+            msgs[i] = messages.get(key);
+            ++i;
+        }
+        
+        IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(msgs, eps);
+        return iar;
+    }
+    
+    /**
+     * This is an implementation for the multiget version. 
+     * @param readMessages map of key --> ReadMessage to be sent
+     * @return map of key --> Row
+     * @throws IOException
+     * @throws TimeoutException
+     */
+    public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> 
readMessages) throws IOException,TimeoutException
+    {
+        Map<String, Row> rows = new HashMap<String, Row>();
+        Set<String> keys = readMessages.keySet();
+        /* Find all the suitable endpoints for the keys */
+        Map<String, EndPoint> endPoints = 
StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] ));
+        /* Construct the messages to be sent out */
+        Map<String, Message> messages = constructMessages(readMessages);
+        /* Dispatch the messages to the respective endpoints */
+        IAsyncResult iar = dispatchMessages(endPoints, messages);        
+        List<byte[]> results = 
iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+        
+        for ( byte[] body : results )
+        {
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            ReadResponse response = 
ReadResponse.serializer().deserialize(bufIn);
+            Row row = response.row();
+            rows.put(row.key(), row);
+        }        
+        return rows;
+    }
+
+    /**
+     * Read the data from one replica.  If there is no reply, read the data 
from another.  In the event we get
+     * the data we perform consistency checks and figure out if any repairs 
need to be done to the replicas.
+     * @param command the read to perform
+     * @return the row associated with command.key
+     * @throws Exception
+     */
+    private static Row weakReadRemote(ReadCommand command) throws IOException
+    {
+        EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
+        assert endPoint != null;
+        Message message = command.makeReadMessage();
+        if (logger.isDebugEnabled())
+            logger.debug("weakreadremote reading " + command + " from " + 
message.getMessageId() + "@" + endPoint);
+        message.addHeader(ReadCommand.DO_REPAIR, 
ReadCommand.DO_REPAIR.getBytes());
+        IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(message, endPoint);
+        byte[] body;
+        try
+        {
+            body = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+        }
+        catch (TimeoutException e)
+        {
+            throw new RuntimeException("error reading key " + command.key, e);
+            // TODO retry to a different endpoint?
+        }
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
+        ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
+        return response.row();
+    }
+
+    /**
+     * Performs the actual reading of a row out of the StorageService, fetching
+     * a specific set of column names from a given column family.
+     */
+    public static Row readProtocol(ReadCommand command, int consistency_level)
+    throws IOException, TimeoutException, InvalidRequestException
+    {
+        long startTime = System.currentTimeMillis();
+
+        Row row;
+        EndPoint[] endpoints = 
StorageService.instance().getNStorageEndPoint(command.key);
+
+        if (consistency_level == ConsistencyLevel.ONE)
+        {
+            boolean foundLocal = 
Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+            if (foundLocal)
+            {
+                row = weakReadLocal(command);
+            }
+            else
+            {
+                row = weakReadRemote(command);
+            }
+        }
+        else
+        {
+            assert consistency_level == ConsistencyLevel.QUORUM;
+            row = strongRead(command);
+        }
+
+        readStats.add(System.currentTimeMillis() - startTime);
+
+        return row;
+    }
+
+    public static Map<String, Row> readProtocol(String[] keys, ReadCommand 
readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    {
+        Map<String, Row> rows = new HashMap<String, Row>();        
+        switch ( consistencyLevel )
+        {
+            case WEAK:
+                rows = weakReadProtocol(keys, readCommand);
+                break;
+                
+            case STRONG:
+                rows = strongReadProtocol(keys, readCommand);
+                break;
+                
+            default:
+                rows = weakReadProtocol(keys, readCommand);
+                break;
+        }
+        return rows;
+    }
+
+    /**
+     * This is a multiget version of the above method.
+     * @param tablename
+     * @param keys
+     * @param columnFamily
+     * @param start
+     * @param count
+     * @return
+     * @throws IOException
+     * @throws TimeoutException
+     */
+    public static Map<String, Row> strongReadProtocol(String[] keys, 
ReadCommand readCommand) throws IOException, TimeoutException
+    {       
+        Map<String, Row> rows;
+        // TODO: throw a thrift exception if we do not have N nodes
+        Map<String, ReadCommand[]> readMessages = new HashMap<String, 
ReadCommand[]>();
+        for (String key : keys )
+        {
+            ReadCommand[] readParameters = new ReadCommand[2];
+            readParameters[0] = readCommand.copy();
+            readParameters[1] = readCommand.copy();
+            readParameters[1].setDigestQuery(true);
+            readMessages.put(key, readParameters);
+        }        
+        rows = doStrongReadProtocol(readMessages);         
+        return rows;
+    }
+
+    /*
+     * This function executes the read protocol.
+        // 1. Get the N nodes from storage service where the data needs to be
+        // replicated
+        // 2. Construct a message for read\write
+         * 3. Set one of the messages to get the data and the rest to get the 
digest
+        // 4. SendRR ( to all the nodes above )
+        // 5. Wait for a response from at least X nodes where X <= N and the 
data node
+         * 6. If the digest matches return the data.
+         * 7. else carry out read repair by getting data from all the nodes.
+        // 5. return success
+     */
+    private static Row strongRead(ReadCommand command) throws IOException, 
TimeoutException, InvalidRequestException
+    {
+        // TODO: throw a thrift exception if we do not have N nodes
+        assert !command.isDigestQuery();
+        ReadCommand readMessageDigestOnly = command.copy();
+        readMessageDigestOnly.setDigestQuery(true);
+
+        Row row = null;
+        Message message = command.makeReadMessage();
+        Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+
+        IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver();
+        QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(
+                DatabaseDescriptor.getQuorum(),
+                readResponseResolver);
+        EndPoint dataPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
+        List<EndPoint> endpointList = new 
ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
+        /* Remove the local storage endpoint from the list. */
+        endpointList.remove(dataPoint);
+        EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+        Message messages[] = new Message[endpointList.size() + 1];
+
+        /*
+         * First message is sent to the node that will actually get
+         * the data for us. The other two replicas are only sent a
+         * digest query.
+        */
+        endPoints[0] = dataPoint;
+        messages[0] = message;
+        if (logger.isDebugEnabled())
+            logger.debug("strongread reading data for " + command + " from " + 
message.getMessageId() + "@" + dataPoint);
+        for (int i = 1; i < endPoints.length; i++)
+        {
+            EndPoint digestPoint = endpointList.get(i - 1);
+            endPoints[i] = digestPoint;
+            messages[i] = messageDigestOnly;
+            if (logger.isDebugEnabled())
+                logger.debug("strongread reading digest for " + command + " 
from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
+        }
+
+        try
+        {
+            MessagingService.getMessagingInstance().sendRR(messages, 
endPoints, quorumResponseHandler);
+
+            long startTime2 = System.currentTimeMillis();
+            row = quorumResponseHandler.get();
+            if (logger.isDebugEnabled())
+                logger.debug("quorumResponseHandler: " + 
(System.currentTimeMillis() - startTime2) + " ms.");
+        }
+        catch (DigestMismatchException ex)
+        {
+            if ( DatabaseDescriptor.getConsistencyCheck())
+            {
+                IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver();
+                QuorumResponseHandler<Row> quorumResponseHandlerRepair = new 
QuorumResponseHandler<Row>(
+                        DatabaseDescriptor.getQuorum(),
+                        readResponseResolverRepair);
+                logger.info("DigestMismatchException: " + command.key);
+                Message messageRepair = command.makeReadMessage();
+                MessagingService.getMessagingInstance().sendRR(messageRepair, 
endPoints,
+                                                               
quorumResponseHandlerRepair);
+                try
+                {
+                    row = quorumResponseHandlerRepair.get();
+                }
+                catch (DigestMismatchException e)
+                {
+                    // TODO should this be a thrift exception?
+                    throw new RuntimeException("digest mismatch reading key " 
+ command.key, e);
+                }
+            }
+        }
+
+        return row;
+    }
+
+    private static Map<String, Message[]> constructReplicaMessages(Map<String, 
ReadCommand[]> readMessages) throws IOException
+    {
+        Map<String, Message[]> messages = new HashMap<String, Message[]>();
+        Set<String> keys = readMessages.keySet();
+        
+        for ( String key : keys )
+        {
+            Message[] msg = new 
Message[DatabaseDescriptor.getReplicationFactor()];
+            ReadCommand[] readParameters = readMessages.get(key);
+            msg[0] = readParameters[0].makeReadMessage();
+            for ( int i = 1; i < msg.length; ++i )
+            {
+                msg[i] = readParameters[1].makeReadMessage();
+            }
+        }        
+        return messages;
+    }
+    
+    private static MultiQuorumResponseHandler dispatchMessages(Map<String, 
ReadCommand[]> readMessages, Map<String, Message[]> messages) throws IOException
+    {
+        Set<String> keys = messages.keySet();
+        /* This maps the keys to the original data read messages */
+        Map<String, ReadCommand> readMessage = new HashMap<String, 
ReadCommand>();
+        /* This maps the keys to their respective endpoints/replicas */
+        Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
+        /* Groups the messages that need to be sent to the individual keys */
+        Message[][] msgList = new 
Message[messages.size()][DatabaseDescriptor.getReplicationFactor()];
+        /* Respects the above grouping and provides the endpoints for the 
above messages */
+        EndPoint[][] epList = new 
EndPoint[messages.size()][DatabaseDescriptor.getReplicationFactor()];
+        
+        int i = 0;
+        for ( String key : keys )
+        {
+            /* This is the primary */
+            EndPoint dataPoint = 
StorageService.instance().findSuitableEndPoint(key);
+            List<EndPoint> replicas = new ArrayList<EndPoint>( 
StorageService.instance().getNLiveStorageEndPoint(key) );
+            replicas.remove(dataPoint);
+            /* Get the messages to be sent index 0 is the data messages and 
index 1 is the digest message */
+            Message[] message = messages.get(key);           
+            msgList[i][0] = message[0];
+            int N = DatabaseDescriptor.getReplicationFactor();
+            for ( int j = 1; j < N; ++j )
+            {
+                msgList[i][j] = message[1];
+            }
+            /* Get the endpoints to which the above messages need to be sent */
+            epList[i][0] = dataPoint;
+            for ( int j = 1; i < N; ++i )
+            {                
+                epList[i][j] = replicas.get(j - 1);
+            } 
+            /* Data ReadMessage associated with this key */
+            readMessage.put( key, readMessages.get(key)[0] );
+            /* EndPoints for this specific key */
+            endpoints.put(key, epList[i]);
+            ++i;
+        }
+                
+        /* Handles the read semantics for this entire set of keys */
+        MultiQuorumResponseHandler quorumResponseHandlers = new 
MultiQuorumResponseHandler(readMessage, endpoints);
+        MessagingService.getMessagingInstance().sendRR(msgList, epList, 
quorumResponseHandlers);
+        return quorumResponseHandlers;
+    }
+    
+    /**
+    *  This method performs the read from the replicas for a bunch of keys.
+    *  @param readMessages map of key --> readMessage[] of two entries where 
+    *         the first entry is the readMessage for the data and the second
+    *         is the entry for the digest 
+    *  @return map containing key ---> Row
+    *  @throws IOException, TimeoutException
+   */
+    private static Map<String, Row> doStrongReadProtocol(Map<String, 
ReadCommand[]> readMessages) throws IOException
+    {        
+        Map<String, Row> rows = new HashMap<String, Row>();
+        /* Construct the messages to be sent to the replicas */
+        Map<String, Message[]> replicaMessages = 
constructReplicaMessages(readMessages);
+        /* Dispatch the messages to the different replicas */
+        MultiQuorumResponseHandler cb = dispatchMessages(readMessages, 
replicaMessages);
+        try
+        {
+            Row[] rows2 = cb.get();
+            for ( Row row : rows2 )
+            {
+                rows.put(row.key(), row);
+            }
+        }
+        catch (TimeoutException e)
+        {
+            throw new RuntimeException("timeout reading keys " + 
StringUtils.join(rows.keySet(), ", "), e);
+        }
+        return rows;
+    }
+
+    /**
+     * This version is used when results for multiple keys needs to be
+     * retrieved.
+     * 
+     * @param tablename name of the table that needs to be queried
+     * @param keys keys whose values we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param columns the columns we are interested in
+     * @return a mapping of key --> Row
+     * @throws Exception
+     */
+    public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand 
readCommand) throws Exception
+    {
+        Row row = null;
+        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
+        for ( String key : keys )
+        {
+            ReadCommand readCmd = readCommand.copy();
+            readMessages.put(key, readCmd);
+        }
+        /* Performs the multiget in parallel */
+        Map<String, Row> rows = doReadProtocol(readMessages);
+        /*
+         * Do the consistency checks for the keys that are being queried
+         * in the background.
+        */
+        for ( String key : keys )
+        {
+            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(key);
+            /* Remove the local storage endpoint from the list. */
+            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+            if ( endpoints.size() > 0 && 
DatabaseDescriptor.getConsistencyCheck())
+                StorageService.instance().doConsistencyCheck(row, endpoints, 
readMessages.get(key));
+        }
+        return rows;
+    }
+
+    /*
+    * This function executes the read protocol locally and should be used only 
if consistency is not a concern.
+    * Read the data from the local disk and return if the row is NOT NULL. If 
the data is NULL do the read from
+    * one of the other replicas (in the same data center if possible) till we 
get the data. In the event we get
+    * the data we perform consistency checks and figure out if any repairs 
need to be done to the replicas.
+    */
+    private static Row weakReadLocal(ReadCommand command) throws IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("weakreadlocal reading " + command);
+        List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(command.key);
+        /* Remove the local storage endpoint from the list. */
+        endpoints.remove(StorageService.getLocalStorageEndPoint());
+        // TODO: throw a thrift exception if we do not have N nodes
+
+        Table table = Table.open(command.table);
+        Row row = command.getRow(table);
+
+        /*
+           * Do the consistency checks in the background and return the
+           * non NULL row.
+           */
+        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+            StorageService.instance().doConsistencyCheck(row, endpoints, 
command);
+        return row;
+    }
+
+    static List<String> getKeyRange(RangeCommand command)
+    {
+        long startTime = System.currentTimeMillis();
+        try
+        {
+            EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith);
+            IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(command.getMessage(), endPoint);
+
+            // read response
+            // TODO send more requests if we need to span multiple nodes
+            byte[] responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+            return RangeReply.read(responseBody).keys;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("error reading keyrange " + command, e);
+        }
+        finally
+        {
+            rangeStats.add(System.currentTimeMillis() - startTime);
+        }
+    }
+
+    public double getReadLatency()
+    {
+        return readStats.mean();
+    }
+
+    public double getRangeLatency()
+    {
+        return rangeStats.mean();
+    }
+
+    public double getWriteLatency()
+    {
+        return writeStats.mean();
+    }
+
+    public int getReadOperations()
+    {
+        return readStats.size();
+    }
+
+    public int getRangeOperations()
+    {
+        return rangeStats.size();
+    }
+
+    public int getWriteOperations()
+    {
+        return writeStats.size();
+    }
+}


Reply via email to