Author: jbellis
Date: Sat Apr 25 15:46:19 2009
New Revision: 768553

URL: http://svn.apache.org/viewvc?rev=768553&view=rev
Log:
split ReadCommand into separate classes for each type of command.  patch by Jun 
Rao; reviewed by jbellis

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java
Modified:
    
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
 Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
@@ -100,7 +101,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, 
key, columnFamily_column, offset_, limit_);
+            ReadCommand readCommand = new 
SliceReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, 
limit_);
             row = StorageProxy.readProtocol(readCommand, 
StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
 Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
@@ -69,7 +70,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, 
key, cfMetaData_.cfName, offset_, limit_);
+            ReadCommand readCommand = new 
SliceReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, 
limit_);
             row = StorageProxy.readProtocol(readCommand, 
StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
 Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.ReadCommand;
@@ -85,7 +86,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, 
key, columnFamily_column, -1, Integer.MAX_VALUE);
+            ReadCommand readCommand = new 
ColumnReadCommand(cfMetaData_.tableName, key, columnFamily_column);
             row = StorageProxy.readProtocol(readCommand, 
StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)

Added: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java?rev=768553&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java 
(added)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java 
Sat Apr 25 15:46:19 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class ColumnReadCommand extends ReadCommand
+{
+    public final String columnFamilyColumn;
+
+    public ColumnReadCommand(String table, String key, String 
columnFamilyColumn)
+    {
+        super(table, key, CMD_TYPE_GET_COLUMN);
+        this.columnFamilyColumn = columnFamilyColumn;
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        String[] values = 
RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+        return values[0];
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new ColumnReadCommand(table, key, 
columnFamilyColumn);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+
+    @Override
+    public Row getRow(Table table) throws IOException    
+    {
+        return table.getRow(key, columnFamilyColumn);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetColumnReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamilyColumn='" + columnFamilyColumn + '\'' +
+               ')';
+    }
+}
+
+class ColumnReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
+    { 
+        ColumnReadCommand realRM = (ColumnReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamilyColumn);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily_column = dis.readUTF();
+        ColumnReadCommand rm = new ColumnReadCommand(table, key, 
columnFamily_column);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Added: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java?rev=768553&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
 (added)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
 Sat Apr 25 15:46:19 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class ColumnsSinceReadCommand extends ReadCommand
+{
+    public final String columnFamily;
+    public final long sinceTimestamp;
+
+    public ColumnsSinceReadCommand(String table, String key, String 
columnFamily, long sinceTimestamp)
+    {
+        super(table, key, CMD_TYPE_GET_COLUMNS_SINCE);
+        this.columnFamily = columnFamily;
+        this.sinceTimestamp = sinceTimestamp;
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return columnFamily;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new ColumnsSinceReadCommand(table, key, 
columnFamily, sinceTimestamp);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+
+    @Override
+    public Row getRow(Table table) throws IOException
+    {        
+        return table.getRow(key, columnFamily, sinceTimestamp);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetColumnsSinceMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", sinceTimestamp='" + sinceTimestamp + '\'' +
+               ')';
+    }
+
+}
+
+class ColumnsSinceReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
+    {
+        ColumnsSinceReadCommand realRM = (ColumnsSinceReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamily);
+        dos.writeLong(realRM.sinceTimestamp);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily = dis.readUTF();
+        long sinceTimestamp = dis.readLong();
+
+        ColumnsSinceReadCommand rm = new ColumnsSinceReadCommand(table, key, 
columnFamily, sinceTimestamp);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Sat 
Apr 25 15:46:19 2009
@@ -22,26 +22,24 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Arrays;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 
 
-/**
- * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( 
[email protected] )
- */
-
-public class ReadCommand
+public abstract class ReadCommand
 {
     public static final String DO_REPAIR = "READ-REPAIR";
-
+    public static final byte CMD_TYPE_GET_ROW=1;
+    public static final byte CMD_TYPE_GET_COLUMN=2;
+    public static final byte CMD_TYPE_GET_SLICE_BY_NAMES=3;
+    public static final byte CMD_TYPE_GET_COLUMNS_SINCE=4;
+    public static final byte CMD_TYPE_GET_SLICE=5;
+    public static final String EMPTY_CF = "";
+    
     private static ReadCommandSerializer serializer = new 
ReadCommandSerializer();
 
     public static ReadCommandSerializer serializer()
@@ -49,8 +47,6 @@
         return serializer;
     }
 
-    private static List<String> EMPTY_COLUMNS = Arrays.asList(new String[0]);
-
     public Message makeReadMessage() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -60,57 +56,17 @@
     }
 
     public final String table;
-
     public final String key;
+    private boolean isDigestQuery = false;    
+    protected final byte commandType;
 
-    public final String columnFamilyColumn;
-
-    public final int start;
-
-    public final int count;
-
-    public final long sinceTimestamp;
-
-    public final List<String> columnNames;
-
-    private boolean isDigestQuery = false;
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, 
int start, int count, long sinceTimestamp, List<String> columnNames)
+    protected ReadCommand(String table, String key, byte cmdType)
     {
         this.table = table;
         this.key = key;
-        this.columnFamilyColumn = columnFamilyColumn;
-        this.start = start;
-        this.count = count;
-        this.sinceTimestamp = sinceTimestamp;
-        this.columnNames = Collections.unmodifiableList(columnNames);
+        this.commandType = cmdType;
     }
-
-    public ReadCommand(String table, String key)
-    {
-        this(table, key, null, -1, -1, -1, EMPTY_COLUMNS);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn)
-    {
-        this(table, key, columnFamilyColumn, -1, -1, -1, EMPTY_COLUMNS);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, 
List<String> columnNames)
-    {
-        this(table, key, columnFamilyColumn, -1, -1, -1, columnNames);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, 
int start, int count)
-    {
-        this(table, key, columnFamilyColumn, start, count, -1, EMPTY_COLUMNS);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, 
long sinceTimestamp)
-    {
-        this(table, key, columnFamilyColumn, -1, -1, sinceTimestamp, 
EMPTY_COLUMNS);
-    }
-
+    
     public boolean isDigestQuery()
     {
         return isDigestQuery;
@@ -121,99 +77,37 @@
         this.isDigestQuery = isDigestQuery;
     }
 
-    public ReadCommand copy()
-    {
-        return new ReadCommand(table, key, columnFamilyColumn, start, count, 
sinceTimestamp, columnNames);
-    }
-
-    public Row getRow(Table table) throws IOException
-    {
-        if (!columnNames.isEmpty())
-        {
-            return table.getRow(key, columnFamilyColumn, columnNames);
-        }
-
-        if (sinceTimestamp > 0)
-        {
-            return table.getRow(key, columnFamilyColumn, sinceTimestamp);
-        }
-
-        if (start > 0 || (count > 0 && count < Integer.MAX_VALUE))
-        {
-            return table.getRow(key, columnFamilyColumn, start, count);
-        }
+    public abstract String getColumnFamilyName();
+    
+    public abstract ReadCommand copy();
 
-        return table.getRow(key, columnFamilyColumn);
-    }
-
-    public String toString()
-    {
-        return "ReadMessage(" +
-               "table='" + table + '\'' +
-               ", key='" + key + '\'' +
-               ", columnFamilyColumn='" + columnFamilyColumn + '\'' +
-               ", start=" + start +
-               ", count=" + count +
-               ", sinceTimestamp=" + sinceTimestamp +
-               ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
-               ')';
-    }
+    public abstract Row getRow(Table table) throws IOException;
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
 {
+    private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = 
new HashMap<Byte, ReadCommandSerializer>(); 
+    static 
+    {
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_ROW, new 
RowReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_COLUMN, new 
ColumnReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new 
SliceByNamesReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_COLUMNS_SINCE, new 
ColumnsSinceReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new 
SliceReadCommandSerializer());
+    }
+
+
     public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
     {
-        dos.writeUTF(rm.table);
-        dos.writeUTF(rm.key);
-        dos.writeUTF(rm.columnFamilyColumn);
-        dos.writeInt(rm.start);
-        dos.writeInt(rm.count);
-        dos.writeLong(rm.sinceTimestamp);
-        dos.writeBoolean(rm.isDigestQuery());
-        dos.writeInt(rm.columnNames.size());
-        if (rm.columnNames.size() > 0)
-        {
-            for (String cName : rm.columnNames)
-            {
-                dos.writeInt(cName.getBytes().length);
-                dos.write(cName.getBytes());
-            }
-        }
+        dos.writeByte(rm.commandType);
+        ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
+        ser.serialize(rm, dos);
     }
 
     public ReadCommand deserialize(DataInputStream dis) throws IOException
     {
-        String table = dis.readUTF();
-        String key = dis.readUTF();
-        String columnFamily_column = dis.readUTF();
-        int start = dis.readInt();
-        int count = dis.readInt();
-        long sinceTimestamp = dis.readLong();
-        boolean isDigest = dis.readBoolean();
-
-        int size = dis.readInt();
-        List<String> columns = new ArrayList<String>();
-        for (int i = 0; i < size; ++i)
-        {
-            byte[] bytes = new byte[dis.readInt()];
-            dis.readFully(bytes);
-            columns.add(new String(bytes));
-        }
-        ReadCommand rm = null;
-        if (columns.size() > 0)
-        {
-            rm = new ReadCommand(table, key, columnFamily_column, columns);
-        }
-        else if (sinceTimestamp > 0)
-        {
-            rm = new ReadCommand(table, key, columnFamily_column, 
sinceTimestamp);
-        }
-        else
-        {
-            rm = new ReadCommand(table, key, columnFamily_column, start, 
count);
-        }
-        rm.setDigestQuery(isDigest);
-        return rm;
+        byte msgType = dis.readByte();
+        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
     }
+        
 }

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java 
Sat Apr 25 15:46:19 2009
@@ -118,28 +118,11 @@
     
     private void doReadRepair(Row row, ReadCommand readCommand)
     {
-        if ( DatabaseDescriptor.getConsistencyCheck() )
-        {
-            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
-            /* Remove the local storage endpoint from the list. */ 
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
+        /* Remove the local storage endpoint from the list. */ 
+        endpoints.remove( StorageService.getLocalStorageEndPoint() );
             
-            if(readCommand.columnNames.size() == 0)
-            {
-                if( readCommand.start >= 0 && readCommand.count < 
Integer.MAX_VALUE)
-                {                
-                    StorageService.instance().doConsistencyCheck(row, 
endpoints, readCommand.columnFamilyColumn, readCommand.start, 
readCommand.count);
-                }
-                
-                if( readCommand.sinceTimestamp > 0)
-                {                    
-                    StorageService.instance().doConsistencyCheck(row, 
endpoints, readCommand.columnFamilyColumn, readCommand.sinceTimestamp);
-                }                
-            }
-            else
-            {
-                StorageService.instance().doConsistencyCheck(row, endpoints, 
readCommand.columnFamilyColumn, readCommand.columnNames);
-            }
-        }
+        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+            StorageService.instance().doConsistencyCheck(row, endpoints, 
readCommand);
     }     
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java 
(added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java 
Sat Apr 25 15:46:19 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class RowReadCommand extends ReadCommand
+{
+    public RowReadCommand(String table, String key)
+    {
+        super(table, key, CMD_TYPE_GET_ROW);
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return null;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new RowReadCommand(table, key);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+
+    @Override
+    public Row getRow(Table table) throws IOException    
+    {
+        return table.get(key);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetColumnReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ')';
+    }
+
+}
+
+class RowReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
+    { 
+        RowReadCommand realRM = (RowReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        RowReadCommand rm = new RowReadCommand(table, key);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Added: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=768553&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
 (added)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
 Sat Apr 25 15:46:19 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
+public class SliceByNamesReadCommand extends ReadCommand
+{
+    public final String columnFamily;
+    public final List<String> columnNames;
+
+    public SliceByNamesReadCommand(String table, String key, String 
columnFamily, List<String> columnNames)
+    {
+        super(table, key, CMD_TYPE_GET_SLICE_BY_NAMES);
+        this.columnFamily = columnFamily;
+        this.columnNames = Collections.unmodifiableList(columnNames);
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return columnFamily;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, 
columnFamily, columnNames);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+    
+    @Override
+    public Row getRow(Table table) throws IOException
+    {        
+        return table.getRow(key, columnFamily, columnNames);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetSliceByNamesReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
+               ')';
+    }
+
+}
+
+class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
+    {
+        SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamily);
+        dos.writeInt(realRM.columnNames.size());
+        if (realRM.columnNames.size() > 0)
+        {
+            for (String cName : realRM.columnNames)
+            {
+                dos.writeInt(cName.getBytes().length);
+                dos.write(cName.getBytes());
+            }
+        }
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily = dis.readUTF();
+
+        int size = dis.readInt();
+        List<String> columns = new ArrayList<String>();
+        for (int i = 0; i < size; ++i)
+        {
+            byte[] bytes = new byte[dis.readInt()];
+            dis.readFully(bytes);
+            columns.add(new String(bytes));
+        }
+        SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, 
columnFamily, columns);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Added: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java 
(added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java 
Sat Apr 25 15:46:19 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class SliceReadCommand extends ReadCommand
+{
+    public final String columnFamily;
+    public final int start;
+    public final int count;
+
+    public SliceReadCommand(String table, String key, String columnFamily, int 
start, int count)
+    {
+        super(table, key, CMD_TYPE_GET_SLICE);
+        this.columnFamily = columnFamily;
+        this.start = start;
+        this.count = count;
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return columnFamily;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new SliceReadCommand(table, key, 
columnFamily, start, count);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+    
+    @Override
+    public Row getRow(Table table) throws IOException
+    {
+        return table.getRow(key, columnFamily, start, count);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetSliceReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", start='" + start + '\'' +
+               ", count='" + count + '\'' +
+               ')';
+    }
+}
+
+class SliceReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws 
IOException
+    {
+        SliceReadCommand realRM = (SliceReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamily);
+        dos.writeInt(realRM.start);
+        dos.writeInt(realRM.count);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily = dis.readUTF();
+        int start = dis.readInt();
+        int count = dis.readInt();
+        
+        SliceReadCommand rm = new SliceReadCommand(table, key, columnFamily, 
start, count);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java 
Sat Apr 25 15:46:19 2009
@@ -36,6 +36,10 @@
 import org.apache.cassandra.cql.common.CqlResult;
 import org.apache.cassandra.cql.driver.CqlDriver;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnReadCommand;
+import org.apache.cassandra.db.ColumnsSinceReadCommand;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
@@ -102,12 +106,8 @@
     
        protected ColumnFamily readColumnFamily(ReadCommand command) throws 
InvalidRequestException
     {
-        String[] values = 
RowMutation.getColumnAndColumnFamily(command.columnFamilyColumn);
-        if( values.length < 1 )
-        {
-            throw new ColumnFamilyNotDefinedException("Empty column Family is 
invalid.");
-        }
-        validateCommand(command.key, command.table, values[0]);
+        String cfName = command.getColumnFamilyName();
+        validateCommand(command.key, command.table, cfName);
 
         Row row;
         try
@@ -127,7 +127,7 @@
         {
             return null;
         }
-        return row.getColumnFamily(values[0]);
+        return row.getColumnFamily(cfName);
        }
 
     public List<column_t> thriftifyColumns(Collection<IColumn> columns)
@@ -156,7 +156,7 @@
         long startTime = System.currentTimeMillis();
         try
         {
-            ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily_column, timeStamp));
+            ColumnFamily cfamily = readColumnFamily(new 
ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
             String[] values = 
RowMutation.getColumnAndColumnFamily(columnFamily_column);
             if (cfamily == null)
             {
@@ -188,7 +188,7 @@
         long startTime = System.currentTimeMillis();
         try
         {
-            ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily, columnNames));
+            ColumnFamily cfamily = readColumnFamily(new 
SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
             if (cfamily == null)
             {
                 return EMPTY_COLUMNS;
@@ -209,7 +209,7 @@
                try
                {
                String[] values = 
RowMutation.getColumnAndColumnFamily(columnFamily_column);
-            ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily_column, start, count));
+            ColumnFamily cfamily = readColumnFamily(new 
SliceReadCommand(tablename, key, columnFamily_column, start, count));
             if (cfamily == null)
                        {
                 return EMPTY_COLUMNS;
@@ -241,7 +241,7 @@
         {
             throw new InvalidRequestException("get_column requires both parts 
of columnfamily:column");
         }
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily_column, -1, Integer.MAX_VALUE));
+        ColumnFamily cfamily = readColumnFamily(new 
ColumnReadCommand(tablename, key, columnFamily_column));
         if (cfamily == null)
         {
             throw new NotFoundException();
@@ -277,7 +277,7 @@
     public int get_column_count(String tablename, String key, String 
columnFamily_column) throws InvalidRequestException
     {
         String[] values = 
RowMutation.getColumnAndColumnFamily(columnFamily_column);
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily_column, -1, Integer.MAX_VALUE));
+        ColumnFamily cfamily = readColumnFamily(new 
SliceReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
         if (cfamily == null)
         {
             return 0;
@@ -367,7 +367,7 @@
         long startTime = System.currentTimeMillis();
                try
                {
-                       ColumnFamily cfamily = readColumnFamily(new 
ReadCommand(tablename, key, columnFamily, superColumnNames));
+                       ColumnFamily cfamily = readColumnFamily(new 
SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
                        if (cfamily == null)
                        {
                 return EMPTY_SUPERCOLUMNS;
@@ -405,7 +405,7 @@
 
     public List<superColumn_t> get_slice_super(String tablename, String key, 
String columnFamily_superColumnName, int start, int count) throws 
InvalidRequestException
     {
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily_superColumnName, start, count));
+        ColumnFamily cfamily = readColumnFamily(new 
SliceReadCommand(tablename, key, columnFamily_superColumnName, start, count));
         if (cfamily == null)
         {
             return EMPTY_SUPERCOLUMNS;
@@ -416,7 +416,7 @@
     
     public superColumn_t get_superColumn(String tablename, String key, String 
columnFamily_column) throws InvalidRequestException, NotFoundException
     {
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, 
key, columnFamily_column, -1, Integer.MAX_VALUE));
+        ColumnFamily cfamily = readColumnFamily(new 
ColumnReadCommand(tablename, key, columnFamily_column));
         if (cfamily == null)
         {
             throw new NotFoundException();

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
 Sat Apr 25 15:46:19 2009
@@ -149,41 +149,17 @@
        private static ICachetable<String, String> readRepairTable_ = new 
Cachetable<String, String>(scheduledTimeMillis_);
        private Row row_;
        protected List<EndPoint> replicas_;
-       private String columnFamily_;
-       private int start_;
-       private int count_;
-       private long sinceTimestamp_;
-       private List<String> columnNames_ = new ArrayList<String>();    
        
-    public ConsistencyManager(Row row_, List<EndPoint> replicas_, String 
columnFamily_, int start_, int count_, long sinceTimestamp_, List<String> 
columnNames_)
+       private ReadCommand readCommand_;
+       
+    public ConsistencyManager(Row row_, List<EndPoint> replicas_, ReadCommand 
readCommand)
     {
-        this.row_ = row_;
-        this.replicas_ = replicas_;
-        this.columnFamily_ = columnFamily_;
-        this.start_ = start_;
-        this.count_ = count_;
-        this.sinceTimestamp_ = sinceTimestamp_;
-        this.columnNames_ = columnNames_;
+        this.readCommand_ = readCommand;
     }
 
-    ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, 
List<String> columns)
-       {
-        this(row, replicas, columnFamily, 0, 0, 0, columns);
-       }
-
-       ConsistencyManager(Row row, List<EndPoint> replicas, String 
columnFamily, int start, int count)
-       {
-        this(row, replicas, columnFamily, start, count, 0, null);
-       }
-
-       ConsistencyManager(Row row, List<EndPoint> replicas, String 
columnFamily, long sinceTimestamp)
-       {
-        this(row, replicas, columnFamily, 0, 0, sinceTimestamp, null);
-       }
-
        public void run()
        {
-               logger_.debug(" Run the consistency checks for " + 
columnFamily_);              
+               logger_.debug(" Run the consistency checks for " + 
readCommand_.getColumnFamilyName());         
         ReadCommand readCommandDigestOnly = constructReadMessage(true);
                try
                {
@@ -199,29 +175,7 @@
     
     private ReadCommand constructReadMessage(boolean isDigestQuery)
     {
-        ReadCommand readCommand = null;
-        String table = DatabaseDescriptor.getTables().get(0);
-        
-        if(columnNames_.size() == 0)
-        {
-            if( start_ >= 0 && count_ < Integer.MAX_VALUE)
-            {
-                readCommand = new ReadCommand(table, row_.key(), 
columnFamily_, start_, count_);
-            }
-            else if(sinceTimestamp_ > 0)
-            {
-                readCommand = new ReadCommand(table, row_.key(), 
columnFamily_, sinceTimestamp_);
-            }
-            else
-            {
-                readCommand = new ReadCommand(table, row_.key(), 
columnFamily_);
-            }
-        }
-        else
-        {
-            readCommand = new ReadCommand(table, row_.key(), columnFamily_, 
columnNames_);
-            
-        }
+        ReadCommand readCommand = readCommand_.copy();
         readCommand.setDigestQuery(isDigestQuery);
         return readCommand;
     }

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java 
Sat Apr 25 15:46:19 2009
@@ -334,21 +334,21 @@
         return row;
     }
 
-    public static Map<String, Row> readProtocol(String tablename, String[] 
keys, String columnFamily, int start, int count, 
StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    public static Map<String, Row> readProtocol(String[] keys, ReadCommand 
readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
     {
         Map<String, Row> rows = new HashMap<String, Row>();        
         switch ( consistencyLevel )
         {
             case WEAK:
-                rows = weakReadProtocol(tablename, keys, columnFamily, start, 
count);
+                rows = weakReadProtocol(keys, readCommand);
                 break;
                 
             case STRONG:
-                rows = strongReadProtocol(tablename, keys, columnFamily, 
start, count);
+                rows = strongReadProtocol(keys, readCommand);
                 break;
                 
             default:
-                rows = weakReadProtocol(tablename, keys, columnFamily, start, 
count);
+                rows = weakReadProtocol(keys, readCommand);
                 break;
         }
         return rows;
@@ -365,7 +365,7 @@
      * @throws IOException
      * @throws TimeoutException
      */
-    public static Map<String, Row> strongReadProtocol(String tablename, 
String[] keys, String columnFamily, int start, int count) throws IOException, 
TimeoutException
+    public static Map<String, Row> strongReadProtocol(String[] keys, 
ReadCommand readCommand) throws IOException, TimeoutException
     {       
         Map<String, Row> rows = new HashMap<String, Row>();
         long startTime = System.currentTimeMillis();        
@@ -374,23 +374,10 @@
         for (String key : keys )
         {
             ReadCommand[] readParameters = new ReadCommand[2];
-            if( start >= 0 && count < Integer.MAX_VALUE)
-            {
-                readParameters[0] = new ReadCommand(tablename, key, 
columnFamily, start, count);
-            }
-            else
-            {
-                readParameters[0] = new ReadCommand(tablename, key, 
columnFamily);
-            }            
-            if( start >= 0 && count < Integer.MAX_VALUE)
-            {
-                readParameters[1] = new ReadCommand(tablename, key, 
columnFamily, start, count);
-            }
-            else
-            {
-                readParameters[1] = new ReadCommand(tablename, key, 
columnFamily);
-            }
+            readParameters[0] = readCommand.copy();
+            readParameters[1] = readCommand.copy();
             readParameters[1].setDigestQuery(true);
+            readMessages.put(key, readParameters);
         }        
         rows = doStrongReadProtocol(readMessages);         
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - 
startTime) + " ms.");
@@ -586,15 +573,15 @@
      * @return a mapping of key --> Row
      * @throws Exception
      */
-    public static Map<String, Row> weakReadProtocol(String tablename, String[] 
keys, String columnFamily, List<String> columns) throws Exception
+    public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand 
readCommand) throws Exception
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
         Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
         for ( String key : keys )
         {
-            ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, columns);
-            readMessages.put(key, readCommand);
+            ReadCommand readCmd = readCommand.copy();
+            readMessages.put(key, readCmd);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);
@@ -608,7 +595,7 @@
             /* Remove the local storage endpoint from the list. */
             endpoints.remove( StorageService.getLocalStorageEndPoint() );
             if ( endpoints.size() > 0 && 
DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, 
columnFamily, columns);
+                StorageService.instance().doConsistencyCheck(row, endpoints, 
readMessages.get(key));
         }
         return rows;
     }
@@ -638,81 +625,4 @@
             StorageService.instance().doConsistencyCheck(row, endpoints, 
command);
         return row;
     }
-
-    /**
-     * This version is used when results for multiple keys needs to be
-     * retrieved.
-     * 
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param start start index
-     * @param count the number of columns we are interested in
-     * @return a mapping of key --> Row
-     * @throws Exception
-     */
-    public static Map<String, Row> weakReadProtocol(String tablename, String[] 
keys, String columnFamily, int start, int count) throws Exception
-    {
-        Row row = null;
-        long startTime = System.currentTimeMillis();
-        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
-        for ( String key : keys )
-        {
-            ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, start, count);
-            readMessages.put(key, readCommand);
-        }
-        /* Performs the multiget in parallel */
-        Map<String, Row> rows = doReadProtocol(readMessages);
-        /*
-         * Do the consistency checks for the keys that are being queried
-         * in the background.
-        */
-        for ( String key : keys )
-        {
-            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */ 
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            if ( endpoints.size() > 0 && 
DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, 
columnFamily, start, count);
-        }
-        return rows;         
-    }
-
-    /**
-     * This version is used when results for multiple keys needs to be
-     * retrieved.
-     * 
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param sinceTimestamp this is lower bound of the timestamp
-     * @return a mapping of key --> Row
-     * @throws Exception
-     */
-    public static Map<String, Row> weakReadProtocol(String tablename, String[] 
keys, String columnFamily, long sinceTimestamp) throws Exception
-    {
-        Row row = null;
-        long startTime = System.currentTimeMillis();
-        Map<String, ReadCommand> readMessages = new HashMap<String, 
ReadCommand>();
-        for ( String key : keys )
-        {
-            ReadCommand readCommand = new ReadCommand(tablename, key, 
columnFamily, sinceTimestamp);
-            readMessages.put(key, readCommand);
-        }
-        /* Performs the multiget in parallel */
-        Map<String, Row> rows = doReadProtocol(readMessages);
-        /*
-         * Do the consistency checks for the keys that are being queried
-         * in the background.
-        */
-        for ( String key : keys )
-        {
-            List<EndPoint> endpoints = 
StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */ 
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            if ( endpoints.size() > 0 && 
DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, 
columnFamily, sinceTimestamp);
-        }
-        return rows;         
-    }
 }

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java 
Sat Apr 25 15:46:19 2009
@@ -567,32 +567,10 @@
      */
     public void doConsistencyCheck(Row row, List<EndPoint> endpoints, 
ReadCommand message)
     {
-        Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), 
endpoints, message.columnFamilyColumn,
-                                                              message.start, 
message.count, message.sinceTimestamp, message.columnNames);
+        Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), 
endpoints, message);
         consistencyManager_.submit(consistencySentinel);
     }
 
-    @Deprecated
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String 
columnFamily, int start, int count)
-       {
-               Runnable consistencySentinel = new 
ConsistencyManager(row.cloneMe(), endpoints, columnFamily, start, count);
-               consistencyManager_.submit(consistencySentinel);
-       }
-
-    @Deprecated
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String 
columnFamily, long sinceTimestamp)
-       {
-               Runnable consistencySentinel = new 
ConsistencyManager(row.cloneMe(), endpoints, columnFamily, sinceTimestamp);
-               consistencyManager_.submit(consistencySentinel);
-       }
-
-    @Deprecated
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String 
columnFamily, List<String> columns)
-    {
-       Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), 
endpoints, columnFamily, columns);
-               consistencyManager_.submit(consistencySentinel);
-    }
-
     /*
      * This method displays all the ranges and the replicas
      * that are responsible for the individual ranges. The

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java 
Sat Apr 25 15:46:19 2009
@@ -42,6 +42,7 @@
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
@@ -880,7 +881,7 @@
                                key = user + ":1";
                        }
 
-                       ReadCommand readCommand = new ReadCommand(tablename_, 
key);
+                       ReadCommand readCommand = new 
RowReadCommand(tablename_, key);
                        Message message = new Message(from_, 
StorageService.readStage_,
                                        StorageService.readVerbHandler_,
                                        new Object[] {readCommand});

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java Sat 
Apr 25 15:46:19 2009
@@ -29,6 +29,7 @@
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.ColumnReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
@@ -180,7 +181,7 @@
                    String stringKey = new Integer(key).toString();
                    stringKey = stringKey + keyFix_ ;
                int j = random.nextInt(columns) + 1;
-                   ReadCommand rm = new ReadCommand(tablename_, stringKey, 
columnFamilyColumn_ + ":" + columnFix_ + j);
+                   ReadCommand rm = new ColumnReadCommand(tablename_, 
stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
                    readLoad(rm);
                                if ( requestsPerSecond_ > 1000)
                                        Thread.sleep(0, 
1000000000/requestsPerSecond_);
@@ -250,7 +251,7 @@
                    stringKey = stringKey + keyFix_ ;
                int i = random.nextInt(superColumns) + 1;
                int j = random.nextInt(columns) + 1;
-                   ReadCommand rm = new ReadCommand(tablename_, stringKey, 
columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+                   ReadCommand rm = new ColumnReadCommand(tablename_, 
stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + 
columnFix_ + j);
                    readLoad(rm);
                        }
                }

Modified: 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java 
(original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java 
Sat Apr 25 15:46:19 2009
@@ -18,10 +18,27 @@
         ArrayList<String> colList = new ArrayList<String>();
         colList.add("col1");
         colList.add("col2");
+        
+        ReadCommand rm, rm2;
+        
+        rm = new SliceByNamesReadCommand("Table1", "row1", "foo", colList);
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
+
+        rm = new ColumnReadCommand("Table1", "row1", "foo:col1");
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
 
-        ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
-        ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
+        rm = new RowReadCommand("Table1", "row1");
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
+
+        rm = new ColumnsSinceReadCommand("Table1", "row1", "foo", 1);
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
 
+        rm = new SliceReadCommand("Table1", "row1", "foo", 1, 2);
+        rm2 = serializeAndDeserializeReadMessage(rm);
         assert rm2.toString().equals(rm.toString());
     }
 
@@ -56,7 +73,7 @@
         rm.add("Standard1:Column1", "abcd".getBytes(), 0);
         rm.apply();
 
-        ReadCommand command = new ReadCommand("Table1", "key1", 
"Standard1:Column1", -1, Integer.MAX_VALUE);
+        ReadCommand command = new ColumnReadCommand("Table1", "key1", 
"Standard1:Column1");
         Row row = command.getRow(table);
         ColumnFamily cf = row.getColumnFamily("Standard1");
         IColumn col = cf.getColumn("Column1");


Reply via email to