Author: jbellis
Date: Mon Apr 20 20:05:07 2009
New Revision: 766838

URL: http://svn.apache.org/viewvc?rev=766838&view=rev
Log:
rename get_cf -> readColumnFamily; ReadMessage -> ReadCommand.
[Message message = ReadMessage.readMessage(readMessage) is just plain confusing]
patch by jbellis; reviewed by Eric Evans for #88

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
      - copied, changed from r766140, 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
Removed:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java

Copied: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java 
(from r766140, 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?p2=incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java&p1=incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java&r1=766140&r2=766838&rev=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Mon 
Apr 20 20:05:07 2009
@@ -28,7 +28,6 @@
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
@@ -38,26 +37,26 @@
  * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( 
[email protected] )
  */
 
-public class ReadMessage implements Serializable
+public class ReadCommand implements Serializable
 {
-    private static ICompactSerializer<ReadMessage> serializer_;        
+    private static ICompactSerializer<ReadCommand> serializer_;
     public static final String doRepair_ = "READ-REPAIR";
-       
+
     static
     {
-        serializer_ = new ReadMessageSerializer();
+        serializer_ = new ReadCommandSerializer();
     }
 
-    static ICompactSerializer<ReadMessage> serializer()
+    static ICompactSerializer<ReadCommand> serializer()
     {
         return serializer_;
     }
     
-    public static Message makeReadMessage(ReadMessage readMessage) throws 
IOException
+    public static Message makeReadMessage(ReadCommand readCommand) throws 
IOException
     {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        ReadMessage.serializer().serialize(readMessage, dos);
+        ReadCommand.serializer().serialize(readCommand, dos);
         Message message = new 
Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, 
StorageService.readVerbHandler_, new Object[]{bos.toByteArray()});         
         return message;
     }
@@ -71,24 +70,24 @@
     private List<String> columns_ = new ArrayList<String>();
     private boolean isDigestQuery_ = false;
         
-    private ReadMessage()
+    private ReadCommand()
     {
     }
     
-    public ReadMessage(String table, String key)
+    public ReadCommand(String table, String key)
     {
         table_ = table;
         key_ = key;
     }
 
-    public ReadMessage(String table, String key, String columnFamily_column)
+    public ReadCommand(String table, String key, String columnFamily_column)
     {
         table_ = table;
         key_ = key;
         columnFamily_column_ = columnFamily_column;
     }
     
-    public ReadMessage(String table, String key, String columnFamily, 
List<String> columns)
+    public ReadCommand(String table, String key, String columnFamily, 
List<String> columns)
     {
        table_ = table;
        key_ = key;
@@ -96,7 +95,7 @@
        columns_ = columns;
     }
     
-    public ReadMessage(String table, String key, String columnFamily_column, 
int start, int count)
+    public ReadCommand(String table, String key, String columnFamily_column, 
int start, int count)
     {
         table_ = table;
         key_ = key;
@@ -105,7 +104,7 @@
         count_ = count;
     }
 
-    public ReadMessage(String table, String key, String columnFamily_column, 
long sinceTimestamp)
+    public ReadCommand(String table, String key, String columnFamily_column, 
long sinceTimestamp)
     {
         table_ = table;
         key_ = key;
@@ -173,9 +172,9 @@
     }
 }
 
-class ReadMessageSerializer implements ICompactSerializer<ReadMessage>
+class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
 {
-       public void serialize(ReadMessage rm, DataOutputStream dos) throws 
IOException
+       public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
        {
                dos.writeUTF(rm.table());
                dos.writeUTF(rm.key());
@@ -196,7 +195,7 @@
                }
        }
        
-    public ReadMessage deserialize(DataInputStream dis) throws IOException
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
     {
                String table = dis.readUTF();
                String key = dis.readUTF();
@@ -214,18 +213,18 @@
                        dis.readFully(bytes);
                        columns.add( new String(bytes) );
                }
-               ReadMessage rm = null;
+               ReadCommand rm = null;
                if ( columns.size() > 0 )
                {
-                       rm = new ReadMessage(table, key, columnFamily_column, 
columns);
+                       rm = new ReadCommand(table, key, columnFamily_column, 
columns);
                }
                else if( sinceTimestamp > 0 )
                {
-                       rm = new ReadMessage(table, key, columnFamily_column, 
sinceTimestamp);
+                       rm = new ReadCommand(table, key, columnFamily_column, 
sinceTimestamp);
                }
                else
                {
-                       rm = new ReadMessage(table, key, columnFamily_column, 
start, count);
+                       rm = new ReadCommand(table, key, columnFamily_column, 
start, count);
                }
                rm.setIsDigestQuery(isDigest);
        return rm;

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java 
Mon Apr 20 20:05:07 2009
@@ -19,12 +19,9 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.EndPoint;
@@ -34,8 +31,6 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( 
[email protected] )
@@ -77,30 +72,30 @@
 
         try
         {
-            ReadMessage readMessage = 
ReadMessage.serializer().deserialize(readCtx.bufIn_);
-            Table table = Table.open(readMessage.table());
+            ReadCommand readCommand = 
ReadCommand.serializer().deserialize(readCtx.bufIn_);
+            Table table = Table.open(readCommand.table());
             Row row = null;
             long start = System.currentTimeMillis();
-            if( readMessage.columnFamily_column() == null )
-               row = table.get(readMessage.key());
+            if( readCommand.columnFamily_column() == null )
+               row = table.get(readCommand.key());
             else
             {
-               if(readMessage.getColumnNames().size() == 0)
+               if(readCommand.getColumnNames().size() == 0)
                {
-                       if(readMessage.count() > 0 && readMessage.start() >= 0)
-                               row = table.getRow(readMessage.key(), 
readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+                       if(readCommand.count() > 0 && readCommand.start() >= 0)
+                               row = table.getRow(readCommand.key(), 
readCommand.columnFamily_column(), readCommand.start(), readCommand.count());
                        else
-                               row = table.getRow(readMessage.key(), 
readMessage.columnFamily_column());
+                               row = table.getRow(readCommand.key(), 
readCommand.columnFamily_column());
                }
                else
                {
-                       row = table.getRow(readMessage.key(), 
readMessage.columnFamily_column(), readMessage.getColumnNames());               
          
+                       row = table.getRow(readCommand.key(), 
readCommand.columnFamily_column(), readCommand.getColumnNames());
                }
             }              
             logger_.info("getRow()  TIME: " + (System.currentTimeMillis() - 
start) + " ms.");
             start = System.currentTimeMillis();
             ReadResponseMessage readResponseMessage = null;
-            if(readMessage.isDigestQuery())
+            if(readCommand.isDigestQuery())
             {
                 readResponseMessage = new 
ReadResponseMessage(table.getTableName(), row.digest());
             }
@@ -108,7 +103,7 @@
             {
                 readResponseMessage = new 
ReadResponseMessage(table.getTableName(), row);
             }
-            readResponseMessage.setIsDigestQuery(readMessage.isDigestQuery());
+            readResponseMessage.setIsDigestQuery(readCommand.isDigestQuery());
             /* serialize the ReadResponseMessage. */
             readCtx.bufOut_.reset();
 
@@ -126,9 +121,9 @@
             logger_.info("ReadVerbHandler  TIME 2: " + 
(System.currentTimeMillis() - start) + " ms.");
             
             /* Do read repair if header of the message says so */
-            String repair = new String( 
message.getHeader(ReadMessage.doRepair_) );
-            if ( repair.equals( ReadMessage.doRepair_ ) )
-                doReadRepair(row, readMessage);
+            String repair = new String( 
message.getHeader(ReadCommand.doRepair_) );
+            if ( repair.equals( ReadCommand.doRepair_ ) )
+                doReadRepair(row, readCommand);
         }
         catch ( IOException ex)
         {
@@ -140,29 +135,29 @@
         }
     }
     
-    private void doReadRepair(Row row, ReadMessage readMessage)
+    private void doReadRepair(Row row, ReadCommand readCommand)
     {
         if ( DatabaseDescriptor.getConsistencyCheck() )
         {
-            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(readMessage.key());
+            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(readCommand.key());
             /* Remove the local storage endpoint from the list. */ 
             endpoints.remove( StorageService.getLocalStorageEndPoint() );
             
-            if(readMessage.getColumnNames().size() == 0)
+            if(readCommand.getColumnNames().size() == 0)
             {
-                if( readMessage.start() >= 0 && readMessage.count() < 
Integer.MAX_VALUE)
+                if( readCommand.start() >= 0 && readCommand.count() < 
Integer.MAX_VALUE)
                 {                
-                    StorageService.instance().doConsistencyCheck(row, 
endpoints, readMessage.columnFamily_column(), readMessage.start(), 
readMessage.count());                    
+                    StorageService.instance().doConsistencyCheck(row, 
endpoints, readCommand.columnFamily_column(), readCommand.start(), 
readCommand.count());
                 }
                 
-                if( readMessage.sinceTimestamp() > 0)
+                if( readCommand.sinceTimestamp() > 0)
                 {                    
-                    StorageService.instance().doConsistencyCheck(row, 
endpoints, readMessage.columnFamily_column(), readMessage.sinceTimestamp());    
                
+                    StorageService.instance().doConsistencyCheck(row, 
endpoints, readCommand.columnFamily_column(), readCommand.sinceTimestamp());
                 }                
             }
             else
             {
-                StorageService.instance().doConsistencyCheck(row, endpoints, 
readMessage.columnFamily_column(), readMessage.getColumnNames());               
                 
+                StorageService.instance().doConsistencyCheck(row, endpoints, 
readCommand.columnFamily_column(), readCommand.getColumnNames());
             }
         }
     }     

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java 
Mon Apr 20 20:05:07 2009
@@ -37,7 +37,6 @@
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Column;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.thrift.TException;
 
@@ -81,7 +80,7 @@
                }
        }
     
-       protected ColumnFamily get_cf(String tablename, String key, String 
columnFamily, List<String> columNames) throws CassandraException, TException
+       protected ColumnFamily readColumnFamily(String tablename, String key, 
String columnFamily, List<String> columNames) throws CassandraException, 
TException
        {
        ColumnFamily cfamily = null;
                try
@@ -205,7 +204,7 @@
                try
                {
                        validateTable(tablename);
-                       ColumnFamily cfamily = get_cf(tablename, key, 
columnFamily, columnNames);
+                       ColumnFamily cfamily = readColumnFamily(tablename, key, 
columnFamily, columnNames);
                        if (cfamily == null)
                        {
                                logger_.info("ERROR ColumnFamily " + 
columnFamily + " is missing.....: "
@@ -486,7 +485,7 @@
                try
                {
                        validateTable(tablename);
-                       ColumnFamily cfamily = get_cf(tablename, key, 
columnFamily, superColumnNames);
+                       ColumnFamily cfamily = readColumnFamily(tablename, key, 
columnFamily, superColumnNames);
                        if (cfamily == null)
                        {
                                logger_.info("ERROR ColumnFamily " + 
columnFamily + " is missing.....: "+"   key:" + key

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
 Mon Apr 20 20:05:07 2009
@@ -22,13 +22,9 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponseMessage;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.io.DataInputBuffer;
@@ -96,9 +92,9 @@
             replicas_.add(StorageService.getLocalStorageEndPoint());
                        IAsyncCallback responseHandler = new 
DataRepairHandler(ConsistencyManager.this.replicas_.size(), 
readResponseResolver); 
                        String table = DatabaseDescriptor.getTables().get(0);
-            ReadMessage readMessage = constructReadMessage(false);
+            ReadCommand readCommand = constructReadMessage(false);
                        // ReadMessage readMessage = new ReadMessage(table, 
row_.key(), columnFamily_);
-            Message message = ReadMessage.makeReadMessage(readMessage);
+            Message message = ReadCommand.makeReadMessage(readCommand);
                        MessagingService.getMessagingInstance().sendRR(message, 
replicas_.toArray( new EndPoint[0] ), responseHandler);                 
                }
        }
@@ -187,10 +183,10 @@
        public void run()
        {
                logger_.debug(" Run the consistency checks for " + 
columnFamily_);              
-        ReadMessage readMessageDigestOnly = constructReadMessage(true);
+        ReadCommand readCommandDigestOnly = constructReadMessage(true);
                try
                {
-                       Message messageDigestOnly = 
ReadMessage.makeReadMessage(readMessageDigestOnly);
+                       Message messageDigestOnly = 
ReadCommand.makeReadMessage(readCommandDigestOnly);
                        IAsyncCallback digestResponseHandler = new 
DigestResponseHandler();
                        
MessagingService.getMessagingInstance().sendRR(messageDigestOnly, 
replicas_.toArray(new EndPoint[0]), digestResponseHandler);
                }
@@ -200,32 +196,32 @@
                }
        }
     
-    private ReadMessage constructReadMessage(boolean isDigestQuery)
+    private ReadCommand constructReadMessage(boolean isDigestQuery)
     {
-        ReadMessage readMessage = null;
+        ReadCommand readCommand = null;
         String table = DatabaseDescriptor.getTables().get(0);
         
         if(columnNames_.size() == 0)
         {
             if( start_ >= 0 && count_ < Integer.MAX_VALUE)
             {
-                readMessage = new ReadMessage(table, row_.key(), 
columnFamily_, start_, count_);
+                readCommand = new ReadCommand(table, row_.key(), 
columnFamily_, start_, count_);
             }
             else if(sinceTimestamp_ > 0)
             {
-                readMessage = new ReadMessage(table, row_.key(), 
columnFamily_, sinceTimestamp_);
+                readCommand = new ReadCommand(table, row_.key(), 
columnFamily_, sinceTimestamp_);
             }
             else
             {
-                readMessage = new ReadMessage(table, row_.key(), 
columnFamily_);
+                readCommand = new ReadCommand(table, row_.key(), 
columnFamily_);
             }
         }
         else
         {
-            readMessage = new ReadMessage(table, row_.key(), columnFamily_, 
columnNames_);
+            readCommand = new ReadCommand(table, row_.key(), columnFamily_, 
columnNames_);
             
         }
-        readMessage.setIsDigestQuery(isDigestQuery);
-        return readMessage;
+        readCommand.setIsDigestQuery(isDigestQuery);
+        return readCommand;
     }
 }

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
 Mon Apr 20 20:05:07 2009
@@ -28,7 +28,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -47,7 +47,7 @@
     private Lock lock_ = new ReentrantLock();
     private Condition condition_;
     /* This maps the keys to the original data read messages */
-    private Map<String, ReadMessage> readMessages_ = new HashMap<String, 
ReadMessage>();
+    private Map<String, ReadCommand> readMessages_ = new HashMap<String, 
ReadCommand>();
     /* This maps the key to its set of replicas */
     private Map<String, EndPoint[]> endpoints_ = new HashMap<String, 
EndPoint[]>();
     /* This maps the groupId to the individual callback for the set of 
messages */
@@ -129,10 +129,10 @@
         {
             if ( DatabaseDescriptor.getConsistencyCheck())
             {                                
-                ReadMessage readMessage = readMessages_.get(key);
-                readMessage.setIsDigestQuery(false);            
-                Message messageRepair = 
ReadMessage.makeReadMessage(readMessage);
-                EndPoint[] endpoints = 
MultiQuorumResponseHandler.this.endpoints_.get( readMessage.key() );
+                ReadCommand readCommand = readMessages_.get(key);
+                readCommand.setIsDigestQuery(false);
+                Message messageRepair = 
ReadCommand.makeReadMessage(readCommand);
+                EndPoint[] endpoints = 
MultiQuorumResponseHandler.this.endpoints_.get( readCommand.key() );
                 Message[][] messages = new Message[][]{ {messageRepair, 
messageRepair, messageRepair} };
                 EndPoint[][] epList = new EndPoint[][]{ endpoints };
                 MessagingService.getMessagingInstance().sendRR(messages, 
epList, MultiQuorumResponseHandler.this);                
@@ -140,7 +140,7 @@
         }
     }
     
-    public MultiQuorumResponseHandler(Map<String, ReadMessage> readMessages, 
Map<String, EndPoint[]> endpoints)
+    public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, 
Map<String, EndPoint[]> endpoints)
     {        
         condition_ = lock_.newCondition();
         readMessages_ = readMessages;

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java 
Mon Apr 20 20:05:07 2009
@@ -30,16 +30,14 @@
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponseMessage;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TouchMessage;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -145,13 +143,13 @@
         }
     }
     
-    private static Map<String, Message> constructMessages(Map<String, 
ReadMessage> readMessages) throws IOException
+    private static Map<String, Message> constructMessages(Map<String, 
ReadCommand> readMessages) throws IOException
     {
         Map<String, Message> messages = new HashMap<String, Message>();
         Set<String> keys = readMessages.keySet();        
         for ( String key : keys )
         {
-            Message message = ReadMessage.makeReadMessage( 
readMessages.get(key) );
+            Message message = ReadCommand.makeReadMessage( 
readMessages.get(key) );
             messages.put(key, message);
         }        
         return messages;
@@ -182,7 +180,7 @@
      * @throws IOException
      * @throws TimeoutException
      */
-    public static Map<String, Row> doReadProtocol(Map<String, ReadMessage> 
readMessages) throws IOException,TimeoutException
+    public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> 
readMessages) throws IOException,TimeoutException
     {
         Map<String, Row> rows = new HashMap<String, Row>();
         Set<String> keys = readMessages.keySet();
@@ -206,14 +204,14 @@
         return rows;
     }
     
-    public static Row doReadProtocol(String key, ReadMessage readMessage) 
throws IOException,TimeoutException
+    public static Row doReadProtocol(String key, ReadCommand readCommand) 
throws IOException,TimeoutException
     {
         Row row = null;
         EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(key);        
         if(endPoint != null)
         {
-            Message message = ReadMessage.makeReadMessage(readMessage);
-            message.addHeader(ReadMessage.doRepair_, 
ReadMessage.doRepair_.getBytes());
+            Message message = ReadCommand.makeReadMessage(readCommand);
+            message.addHeader(ReadCommand.doRepair_, 
ReadCommand.doRepair_.getBytes());
             IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(message, endPoint);
             Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
             byte[] body = (byte[])result[0];
@@ -310,9 +308,9 @@
         }   
         if(!foundLocal && consistencyLevel == 
StorageService.ConsistencyLevel.WEAK)
         {
-            ReadMessage readMessage = null;
-            readMessage = new ReadMessage(tablename, key, columnFamily, 
columnNames);
-            return doReadProtocol(key, readMessage);
+            ReadCommand readCommand = null;
+            readCommand = new ReadCommand(tablename, key, columnFamily, 
columnNames);
+            return doReadProtocol(key, readCommand);
         }
         else
         {
@@ -349,9 +347,9 @@
         }   
         if(!foundLocal && consistencyLevel == 
StorageService.ConsistencyLevel.WEAK)
         {
-            ReadMessage readMessage = null;
-            readMessage = new ReadMessage(tablename, key, columnFamily, start, 
count);
-            return doReadProtocol(key, readMessage);
+            ReadCommand readCommand = null;
+            readCommand = new ReadCommand(tablename, key, columnFamily, start, 
count);
+            return doReadProtocol(key, readCommand);
         }
         else
         {
@@ -408,9 +406,9 @@
         }   
         if(!foundLocal && consistencyLevel == 
StorageService.ConsistencyLevel.WEAK)
         {
-            ReadMessage readMessage = null;
-            readMessage = new ReadMessage(tablename, key, columnFamily, 
sinceTimestamp);
-            return doReadProtocol(key, readMessage);
+            ReadCommand readCommand = null;
+            readCommand = new ReadCommand(tablename, key, columnFamily, 
sinceTimestamp);
+            return doReadProtocol(key, readCommand);
         }
         else
         {
@@ -436,12 +434,12 @@
     {       
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        ReadMessage readMessage = new ReadMessage(tablename, key, 
columnFamily, columns);               
+        ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, columns);
         
-        ReadMessage readMessageDigestOnly = new ReadMessage(tablename, key, 
columnFamily, columns);     
-        readMessageDigestOnly.setIsDigestQuery(true);        
+        ReadCommand readCommandDigestOnly = new ReadCommand(tablename, key, 
columnFamily, columns);
+        readCommandDigestOnly.setIsDigestQuery(true);
         
-        Row row = StorageProxy.doStrongReadProtocol(key, readMessage, 
readMessageDigestOnly);
+        Row row = StorageProxy.doStrongReadProtocol(key, readCommand, 
readCommandDigestOnly);
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - 
startTime) + " ms.");     
         return row;
     }
@@ -466,27 +464,27 @@
     {       
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        ReadMessage readMessage = null;
-        ReadMessage readMessageDigestOnly = null;
+        ReadCommand readCommand = null;
+        ReadCommand readCommandDigestOnly = null;
         if( start >= 0 && count < Integer.MAX_VALUE)
         {
-            readMessage = new ReadMessage(tablename, key, columnFamily, start, 
count);
+            readCommand = new ReadCommand(tablename, key, columnFamily, start, 
count);
         }
         else
         {
-            readMessage = new ReadMessage(tablename, key, columnFamily);
+            readCommand = new ReadCommand(tablename, key, columnFamily);
         }
-        Message message = ReadMessage.makeReadMessage(readMessage);
+        Message message = ReadCommand.makeReadMessage(readCommand);
         if( start >= 0 && count < Integer.MAX_VALUE)
         {
-            readMessageDigestOnly = new ReadMessage(tablename, key, 
columnFamily, start, count);
+            readCommandDigestOnly = new ReadCommand(tablename, key, 
columnFamily, start, count);
         }
         else
         {
-            readMessageDigestOnly = new ReadMessage(tablename, key, 
columnFamily);
+            readCommandDigestOnly = new ReadCommand(tablename, key, 
columnFamily);
         }
-        readMessageDigestOnly.setIsDigestQuery(true);        
-        Row row = doStrongReadProtocol(key, readMessage, 
readMessageDigestOnly);
+        readCommandDigestOnly.setIsDigestQuery(true);
+        Row row = doStrongReadProtocol(key, readCommand, 
readCommandDigestOnly);
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - 
startTime) + " ms.");
         return row;
     }
@@ -507,27 +505,27 @@
         Map<String, Row> rows = new HashMap<String, Row>();
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        Map<String, ReadMessage[]> readMessages = new HashMap<String, 
ReadMessage[]>();        
+        Map<String, ReadCommand[]> readMessages = new HashMap<String, 
ReadCommand[]>();
         for (String key : keys )
         {
-            ReadMessage[] readMessage = new ReadMessage[2];
+            ReadCommand[] readCommand = new ReadCommand[2];
             if( start >= 0 && count < Integer.MAX_VALUE)
             {
-                readMessage[0] = new ReadMessage(tablename, key, columnFamily, 
start, count);
+                readCommand[0] = new ReadCommand(tablename, key, columnFamily, 
start, count);
             }
             else
             {
-                readMessage[0] = new ReadMessage(tablename, key, columnFamily);
+                readCommand[0] = new ReadCommand(tablename, key, columnFamily);
             }            
             if( start >= 0 && count < Integer.MAX_VALUE)
             {
-                readMessage[1] = new ReadMessage(tablename, key, columnFamily, 
start, count);
+                readCommand[1] = new ReadCommand(tablename, key, columnFamily, 
start, count);
             }
             else
             {
-                readMessage[1] = new ReadMessage(tablename, key, columnFamily);
+                readCommand[1] = new ReadCommand(tablename, key, columnFamily);
             }
-            readMessage[1].setIsDigestQuery(true);
+            readCommand[1].setIsDigestQuery(true);
         }        
         rows = doStrongReadProtocol(readMessages);         
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - 
startTime) + " ms.");
@@ -538,13 +536,13 @@
     {       
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        ReadMessage readMessage = null;
-        ReadMessage readMessageDigestOnly = null;
-        readMessage = new ReadMessage(tablename, key, columnFamily, 
sinceTimestamp);
-        Message message = ReadMessage.makeReadMessage(readMessage);
-        readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, 
sinceTimestamp);
-        readMessageDigestOnly.setIsDigestQuery(true);        
-        Row row = doStrongReadProtocol(key, readMessage, 
readMessageDigestOnly);
+        ReadCommand readCommand = null;
+        ReadCommand readCommandDigestOnly = null;
+        readCommand = new ReadCommand(tablename, key, columnFamily, 
sinceTimestamp);
+        Message message = ReadCommand.makeReadMessage(readCommand);
+        readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, 
sinceTimestamp);
+        readCommandDigestOnly.setIsDigestQuery(true);
+        Row row = doStrongReadProtocol(key, readCommand, 
readCommandDigestOnly);
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - 
startTime) + " ms.");
         return row;
     }
@@ -555,11 +553,11 @@
      *  param @ readMessage - the read message to get the actual data
      *  param @ readMessageDigest - the read message to get the digest.
     */
-    private static Row doStrongReadProtocol(String key, ReadMessage 
readMessage, ReadMessage readMessageDigest) throws IOException, TimeoutException
+    private static Row doStrongReadProtocol(String key, ReadCommand 
readCommand, ReadCommand readCommandDigest) throws IOException, TimeoutException
     {
         Row row = null;
-        Message message = ReadMessage.makeReadMessage(readMessage);
-        Message messageDigestOnly = 
ReadMessage.makeReadMessage(readMessageDigest);
+        Message message = ReadCommand.makeReadMessage(readCommand);
+        Message messageDigestOnly = 
ReadCommand.makeReadMessage(readCommandDigest);
         
         IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver();
         QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(
@@ -607,9 +605,9 @@
                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = 
new QuorumResponseHandler<Row>(
                            DatabaseDescriptor.getReplicationFactor(),
                            readResponseResolverRepair);
-                   readMessage.setIsDigestQuery(false);
+                   readCommand.setIsDigestQuery(false);
                    logger_.info("DigestMismatchException: " + key);            
-                   Message messageRepair = 
ReadMessage.makeReadMessage(readMessage);
+                   Message messageRepair = 
ReadCommand.makeReadMessage(readCommand);
                    
MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, 
quorumResponseHandlerRepair);
                    try
                    {
@@ -628,7 +626,7 @@
         return row;
     }
     
-    private static Map<String, Message[]> constructReplicaMessages(Map<String, 
ReadMessage[]> readMessages) throws IOException
+    private static Map<String, Message[]> constructReplicaMessages(Map<String, 
ReadCommand[]> readMessages) throws IOException
     {
         Map<String, Message[]> messages = new HashMap<String, Message[]>();
         Set<String> keys = readMessages.keySet();
@@ -636,21 +634,21 @@
         for ( String key : keys )
         {
             Message[] msg = new 
Message[DatabaseDescriptor.getReplicationFactor()];
-            ReadMessage[] readMessage = readMessages.get(key);
-            msg[0] = ReadMessage.makeReadMessage( readMessage[0] );            
+            ReadCommand[] readCommand = readMessages.get(key);
+            msg[0] = ReadCommand.makeReadMessage( readCommand[0] );
             for ( int i = 1; i < msg.length; ++i )
             {
-                msg[i] = ReadMessage.makeReadMessage( readMessage[1] );
+                msg[i] = ReadCommand.makeReadMessage( readCommand[1] );
             }
         }        
         return messages;
     }
     
-    private static MultiQuorumResponseHandler dispatchMessages(Map<String, 
ReadMessage[]> readMessages, Map<String, Message[]> messages) throws IOException
+    private static MultiQuorumResponseHandler dispatchMessages(Map<String, 
ReadCommand[]> readMessages, Map<String, Message[]> messages) throws IOException
     {
         Set<String> keys = messages.keySet();
         /* This maps the keys to the original data read messages */
-        Map<String, ReadMessage> readMessage = new HashMap<String, 
ReadMessage>();
+        Map<String, ReadCommand> readMessage = new HashMap<String, 
ReadCommand>();
         /* This maps the keys to their respective endpoints/replicas */
         Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
         /* Groups the messages that need to be sent to the individual keys */
@@ -700,7 +698,7 @@
     *  @return map containing key ---> Row
     *  @throws IOException, TimeoutException
    */
-    private static Map<String, Row> doStrongReadProtocol(Map<String, 
ReadMessage[]> readMessages) throws IOException
+    private static Map<String, Row> doStrongReadProtocol(Map<String, 
ReadCommand[]> readMessages) throws IOException
     {        
         Map<String, Row> rows = new HashMap<String, Row>();
         /* Construct the messages to be sent to the replicas */
@@ -769,11 +767,11 @@
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
-        Map<String, ReadMessage> readMessages = new HashMap<String, 
ReadMessage>();
+        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
         for ( String key : keys )
         {
-            ReadMessage readMessage = new ReadMessage(tablename, key, 
columnFamily, columns);
-            readMessages.put(key, readMessage);
+            ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, columns);
+            readMessages.put(key, readCommand);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);
@@ -850,11 +848,11 @@
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
-        Map<String, ReadMessage> readMessages = new HashMap<String, 
ReadMessage>();
+        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
         for ( String key : keys )
         {
-            ReadMessage readMessage = new ReadMessage(tablename, key, 
columnFamily, start, count);
-            readMessages.put(key, readMessage);
+            ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, start, count);
+            readMessages.put(key, readCommand);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);
@@ -919,11 +917,11 @@
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
-        Map<String, ReadMessage> readMessages = new HashMap<String, 
ReadMessage>();
+        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
         for ( String key : keys )
         {
-            ReadMessage readMessage = new ReadMessage(tablename, key, 
columnFamily, sinceTimestamp);
-            readMessages.put(key, readMessage);
+            ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, sinceTimestamp);
+            readMessages.put(key, readCommand);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java 
Mon Apr 20 20:05:07 2009
@@ -43,7 +43,7 @@
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponseMessage;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
@@ -880,10 +880,10 @@
                                key = user + ":1";
                        }
 
-                       ReadMessage readMessage = new ReadMessage(tablename_, 
key);
+                       ReadCommand readCommand = new ReadCommand(tablename_, 
key);
                        Message message = new Message(from_, 
StorageService.readStage_,
                                        StorageService.readVerbHandler_,
-                                       new Object[] { readMessage });
+                                       new Object[] {readCommand});
                        IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(
                                        message, to_);
                        Object[] result = iar.get();

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java Mon 
Apr 20 20:05:07 2009
@@ -19,10 +19,7 @@
 package org.apache.cassandra.test;
 
 import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
@@ -30,17 +27,13 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.analytics.AnalyticsContext;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.RowMutationMessage;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.Cassandra;
@@ -147,7 +140,7 @@
         
     }  
        
-    public void readLoad(ReadMessage readMessage)
+    public void readLoad(ReadCommand readCommand)
     {
                IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver();
                QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(
@@ -155,7 +148,7 @@
                                readResponseResolver);
                Message message = new Message(from_, StorageService.readStage_,
                                StorageService.readVerbHandler_,
-                               new Object[] { readMessage });
+                               new Object[] {readCommand});
                MessagingService.getMessagingInstance().sendOneWay(message, 
to_);
                /*IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(message, to_);
                try
@@ -187,7 +180,7 @@
                    String stringKey = new Integer(key).toString();
                    stringKey = stringKey + keyFix_ ;
                int j = random.nextInt(columns) + 1;
-                   ReadMessage rm = new ReadMessage(tablename_, stringKey, 
columnFamilyColumn_ + ":" + columnFix_ + j);
+                   ReadCommand rm = new ReadCommand(tablename_, stringKey, 
columnFamilyColumn_ + ":" + columnFix_ + j);
                    readLoad(rm);
                                if ( requestsPerSecond_ > 1000)
                                        Thread.sleep(0, 
1000000000/requestsPerSecond_);
@@ -257,7 +250,7 @@
                    stringKey = stringKey + keyFix_ ;
                int i = random.nextInt(superColumns) + 1;
                int j = random.nextInt(columns) + 1;
-                   ReadMessage rm = new ReadMessage(tablename_, stringKey, 
columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+                   ReadCommand rm = new ReadCommand(tablename_, stringKey, 
columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
                    readLoad(rm);
                        }
                }

Modified: 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java 
(original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java 
Mon Apr 20 20:05:07 2009
@@ -16,16 +16,16 @@
         colList.add("col1");
         colList.add("col2");
 
-        ReadMessage rm = new ReadMessage("Table1", "row1", "foo", colList);
-        ReadMessage rm2 = serializeAndDeserializeReadMessage(rm);
+        ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
+        ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
 
         assert rm2.toString().equals(rm.toString());
     }
 
-    private ReadMessage serializeAndDeserializeReadMessage(ReadMessage rm)
+    private ReadCommand serializeAndDeserializeReadMessage(ReadCommand rm)
     {
-        ReadMessage rm2 = null;
-        ReadMessageSerializer rms = (ReadMessageSerializer) 
ReadMessage.serializer();
+        ReadCommand rm2 = null;
+        ReadCommandSerializer rms = (ReadCommandSerializer) 
ReadCommand.serializer();
         DataOutputBuffer dos = new DataOutputBuffer();
         DataInputBuffer dis = new DataInputBuffer();
 


Reply via email to