Author: jbellis
Date: Mon Apr 20 20:05:07 2009
New Revision: 766838
URL: http://svn.apache.org/viewvc?rev=766838&view=rev
Log:
rename get_cf -> readColumnFamily; ReadMessage -> ReadCommand.
[Message message = ReadMessage.readMessage(readMessage) is just plain confusing]
patch by jbellis; reviewed by Eric Evans for #88
Added:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
- copied, changed from r766140,
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
Removed:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
Copied: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
(from r766140,
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?p2=incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java&p1=incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java&r1=766140&r2=766838&rev=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Mon
Apr 20 20:05:07 2009
@@ -28,7 +28,6 @@
import org.apache.commons.lang.StringUtils;
-import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
@@ -38,26 +37,26 @@
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
*/
-public class ReadMessage implements Serializable
+public class ReadCommand implements Serializable
{
- private static ICompactSerializer<ReadMessage> serializer_;
+ private static ICompactSerializer<ReadCommand> serializer_;
public static final String doRepair_ = "READ-REPAIR";
-
+
static
{
- serializer_ = new ReadMessageSerializer();
+ serializer_ = new ReadCommandSerializer();
}
- static ICompactSerializer<ReadMessage> serializer()
+ static ICompactSerializer<ReadCommand> serializer()
{
return serializer_;
}
- public static Message makeReadMessage(ReadMessage readMessage) throws
IOException
+ public static Message makeReadMessage(ReadCommand readCommand) throws
IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- ReadMessage.serializer().serialize(readMessage, dos);
+ ReadCommand.serializer().serialize(readCommand, dos);
Message message = new
Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_,
StorageService.readVerbHandler_, new Object[]{bos.toByteArray()});
return message;
}
@@ -71,24 +70,24 @@
private List<String> columns_ = new ArrayList<String>();
private boolean isDigestQuery_ = false;
- private ReadMessage()
+ private ReadCommand()
{
}
- public ReadMessage(String table, String key)
+ public ReadCommand(String table, String key)
{
table_ = table;
key_ = key;
}
- public ReadMessage(String table, String key, String columnFamily_column)
+ public ReadCommand(String table, String key, String columnFamily_column)
{
table_ = table;
key_ = key;
columnFamily_column_ = columnFamily_column;
}
- public ReadMessage(String table, String key, String columnFamily,
List<String> columns)
+ public ReadCommand(String table, String key, String columnFamily,
List<String> columns)
{
table_ = table;
key_ = key;
@@ -96,7 +95,7 @@
columns_ = columns;
}
- public ReadMessage(String table, String key, String columnFamily_column,
int start, int count)
+ public ReadCommand(String table, String key, String columnFamily_column,
int start, int count)
{
table_ = table;
key_ = key;
@@ -105,7 +104,7 @@
count_ = count;
}
- public ReadMessage(String table, String key, String columnFamily_column,
long sinceTimestamp)
+ public ReadCommand(String table, String key, String columnFamily_column,
long sinceTimestamp)
{
table_ = table;
key_ = key;
@@ -173,9 +172,9 @@
}
}
-class ReadMessageSerializer implements ICompactSerializer<ReadMessage>
+class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
{
- public void serialize(ReadMessage rm, DataOutputStream dos) throws
IOException
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws
IOException
{
dos.writeUTF(rm.table());
dos.writeUTF(rm.key());
@@ -196,7 +195,7 @@
}
}
- public ReadMessage deserialize(DataInputStream dis) throws IOException
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
{
String table = dis.readUTF();
String key = dis.readUTF();
@@ -214,18 +213,18 @@
dis.readFully(bytes);
columns.add( new String(bytes) );
}
- ReadMessage rm = null;
+ ReadCommand rm = null;
if ( columns.size() > 0 )
{
- rm = new ReadMessage(table, key, columnFamily_column,
columns);
+ rm = new ReadCommand(table, key, columnFamily_column,
columns);
}
else if( sinceTimestamp > 0 )
{
- rm = new ReadMessage(table, key, columnFamily_column,
sinceTimestamp);
+ rm = new ReadCommand(table, key, columnFamily_column,
sinceTimestamp);
}
else
{
- rm = new ReadMessage(table, key, columnFamily_column,
start, count);
+ rm = new ReadCommand(table, key, columnFamily_column,
start, count);
}
rm.setIsDigestQuery(isDigest);
return rm;
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
Mon Apr 20 20:05:07 2009
@@ -19,12 +19,9 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.EndPoint;
@@ -34,8 +31,6 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
@@ -77,30 +72,30 @@
try
{
- ReadMessage readMessage =
ReadMessage.serializer().deserialize(readCtx.bufIn_);
- Table table = Table.open(readMessage.table());
+ ReadCommand readCommand =
ReadCommand.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(readCommand.table());
Row row = null;
long start = System.currentTimeMillis();
- if( readMessage.columnFamily_column() == null )
- row = table.get(readMessage.key());
+ if( readCommand.columnFamily_column() == null )
+ row = table.get(readCommand.key());
else
{
- if(readMessage.getColumnNames().size() == 0)
+ if(readCommand.getColumnNames().size() == 0)
{
- if(readMessage.count() > 0 && readMessage.start() >= 0)
- row = table.getRow(readMessage.key(),
readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+ if(readCommand.count() > 0 && readCommand.start() >= 0)
+ row = table.getRow(readCommand.key(),
readCommand.columnFamily_column(), readCommand.start(), readCommand.count());
else
- row = table.getRow(readMessage.key(),
readMessage.columnFamily_column());
+ row = table.getRow(readCommand.key(),
readCommand.columnFamily_column());
}
else
{
- row = table.getRow(readMessage.key(),
readMessage.columnFamily_column(), readMessage.getColumnNames());
+ row = table.getRow(readCommand.key(),
readCommand.columnFamily_column(), readCommand.getColumnNames());
}
}
logger_.info("getRow() TIME: " + (System.currentTimeMillis() -
start) + " ms.");
start = System.currentTimeMillis();
ReadResponseMessage readResponseMessage = null;
- if(readMessage.isDigestQuery())
+ if(readCommand.isDigestQuery())
{
readResponseMessage = new
ReadResponseMessage(table.getTableName(), row.digest());
}
@@ -108,7 +103,7 @@
{
readResponseMessage = new
ReadResponseMessage(table.getTableName(), row);
}
- readResponseMessage.setIsDigestQuery(readMessage.isDigestQuery());
+ readResponseMessage.setIsDigestQuery(readCommand.isDigestQuery());
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
@@ -126,9 +121,9 @@
logger_.info("ReadVerbHandler TIME 2: " +
(System.currentTimeMillis() - start) + " ms.");
/* Do read repair if header of the message says so */
- String repair = new String(
message.getHeader(ReadMessage.doRepair_) );
- if ( repair.equals( ReadMessage.doRepair_ ) )
- doReadRepair(row, readMessage);
+ String repair = new String(
message.getHeader(ReadCommand.doRepair_) );
+ if ( repair.equals( ReadCommand.doRepair_ ) )
+ doReadRepair(row, readCommand);
}
catch ( IOException ex)
{
@@ -140,29 +135,29 @@
}
}
- private void doReadRepair(Row row, ReadMessage readMessage)
+ private void doReadRepair(Row row, ReadCommand readCommand)
{
if ( DatabaseDescriptor.getConsistencyCheck() )
{
- List<EndPoint> endpoints =
StorageService.instance().getNLiveStorageEndPoint(readMessage.key());
+ List<EndPoint> endpoints =
StorageService.instance().getNLiveStorageEndPoint(readCommand.key());
/* Remove the local storage endpoint from the list. */
endpoints.remove( StorageService.getLocalStorageEndPoint() );
- if(readMessage.getColumnNames().size() == 0)
+ if(readCommand.getColumnNames().size() == 0)
{
- if( readMessage.start() >= 0 && readMessage.count() <
Integer.MAX_VALUE)
+ if( readCommand.start() >= 0 && readCommand.count() <
Integer.MAX_VALUE)
{
- StorageService.instance().doConsistencyCheck(row,
endpoints, readMessage.columnFamily_column(), readMessage.start(),
readMessage.count());
+ StorageService.instance().doConsistencyCheck(row,
endpoints, readCommand.columnFamily_column(), readCommand.start(),
readCommand.count());
}
- if( readMessage.sinceTimestamp() > 0)
+ if( readCommand.sinceTimestamp() > 0)
{
- StorageService.instance().doConsistencyCheck(row,
endpoints, readMessage.columnFamily_column(), readMessage.sinceTimestamp());
+ StorageService.instance().doConsistencyCheck(row,
endpoints, readCommand.columnFamily_column(), readCommand.sinceTimestamp());
}
}
else
{
- StorageService.instance().doConsistencyCheck(row, endpoints,
readMessage.columnFamily_column(), readMessage.getColumnNames());
+ StorageService.instance().doConsistencyCheck(row, endpoints,
readCommand.columnFamily_column(), readCommand.getColumnNames());
}
}
}
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
Mon Apr 20 20:05:07 2009
@@ -37,7 +37,6 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Column;
import org.apache.cassandra.utils.LogUtil;
import org.apache.thrift.TException;
@@ -81,7 +80,7 @@
}
}
- protected ColumnFamily get_cf(String tablename, String key, String
columnFamily, List<String> columNames) throws CassandraException, TException
+ protected ColumnFamily readColumnFamily(String tablename, String key,
String columnFamily, List<String> columNames) throws CassandraException,
TException
{
ColumnFamily cfamily = null;
try
@@ -205,7 +204,7 @@
try
{
validateTable(tablename);
- ColumnFamily cfamily = get_cf(tablename, key,
columnFamily, columnNames);
+ ColumnFamily cfamily = readColumnFamily(tablename, key,
columnFamily, columnNames);
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " +
columnFamily + " is missing.....: "
@@ -486,7 +485,7 @@
try
{
validateTable(tablename);
- ColumnFamily cfamily = get_cf(tablename, key,
columnFamily, superColumnNames);
+ ColumnFamily cfamily = readColumnFamily(tablename, key,
columnFamily, superColumnNames);
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " +
columnFamily + " is missing.....: "+" key:" + key
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
Mon Apr 20 20:05:07 2009
@@ -22,13 +22,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponseMessage;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.io.DataInputBuffer;
@@ -96,9 +92,9 @@
replicas_.add(StorageService.getLocalStorageEndPoint());
IAsyncCallback responseHandler = new
DataRepairHandler(ConsistencyManager.this.replicas_.size(),
readResponseResolver);
String table = DatabaseDescriptor.getTables().get(0);
- ReadMessage readMessage = constructReadMessage(false);
+ ReadCommand readCommand = constructReadMessage(false);
// ReadMessage readMessage = new ReadMessage(table,
row_.key(), columnFamily_);
- Message message = ReadMessage.makeReadMessage(readMessage);
+ Message message = ReadCommand.makeReadMessage(readCommand);
MessagingService.getMessagingInstance().sendRR(message,
replicas_.toArray( new EndPoint[0] ), responseHandler);
}
}
@@ -187,10 +183,10 @@
public void run()
{
logger_.debug(" Run the consistency checks for " +
columnFamily_);
- ReadMessage readMessageDigestOnly = constructReadMessage(true);
+ ReadCommand readCommandDigestOnly = constructReadMessage(true);
try
{
- Message messageDigestOnly =
ReadMessage.makeReadMessage(readMessageDigestOnly);
+ Message messageDigestOnly =
ReadCommand.makeReadMessage(readCommandDigestOnly);
IAsyncCallback digestResponseHandler = new
DigestResponseHandler();
MessagingService.getMessagingInstance().sendRR(messageDigestOnly,
replicas_.toArray(new EndPoint[0]), digestResponseHandler);
}
@@ -200,32 +196,32 @@
}
}
- private ReadMessage constructReadMessage(boolean isDigestQuery)
+ private ReadCommand constructReadMessage(boolean isDigestQuery)
{
- ReadMessage readMessage = null;
+ ReadCommand readCommand = null;
String table = DatabaseDescriptor.getTables().get(0);
if(columnNames_.size() == 0)
{
if( start_ >= 0 && count_ < Integer.MAX_VALUE)
{
- readMessage = new ReadMessage(table, row_.key(),
columnFamily_, start_, count_);
+ readCommand = new ReadCommand(table, row_.key(),
columnFamily_, start_, count_);
}
else if(sinceTimestamp_ > 0)
{
- readMessage = new ReadMessage(table, row_.key(),
columnFamily_, sinceTimestamp_);
+ readCommand = new ReadCommand(table, row_.key(),
columnFamily_, sinceTimestamp_);
}
else
{
- readMessage = new ReadMessage(table, row_.key(),
columnFamily_);
+ readCommand = new ReadCommand(table, row_.key(),
columnFamily_);
}
}
else
{
- readMessage = new ReadMessage(table, row_.key(), columnFamily_,
columnNames_);
+ readCommand = new ReadCommand(table, row_.key(), columnFamily_,
columnNames_);
}
- readMessage.setIsDigestQuery(isDigestQuery);
- return readMessage;
+ readCommand.setIsDigestQuery(isDigestQuery);
+ return readCommand;
}
}
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
Mon Apr 20 20:05:07 2009
@@ -28,7 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IAsyncCallback;
@@ -47,7 +47,7 @@
private Lock lock_ = new ReentrantLock();
private Condition condition_;
/* This maps the keys to the original data read messages */
- private Map<String, ReadMessage> readMessages_ = new HashMap<String,
ReadMessage>();
+ private Map<String, ReadCommand> readMessages_ = new HashMap<String,
ReadCommand>();
/* This maps the key to its set of replicas */
private Map<String, EndPoint[]> endpoints_ = new HashMap<String,
EndPoint[]>();
/* This maps the groupId to the individual callback for the set of
messages */
@@ -129,10 +129,10 @@
{
if ( DatabaseDescriptor.getConsistencyCheck())
{
- ReadMessage readMessage = readMessages_.get(key);
- readMessage.setIsDigestQuery(false);
- Message messageRepair =
ReadMessage.makeReadMessage(readMessage);
- EndPoint[] endpoints =
MultiQuorumResponseHandler.this.endpoints_.get( readMessage.key() );
+ ReadCommand readCommand = readMessages_.get(key);
+ readCommand.setIsDigestQuery(false);
+ Message messageRepair =
ReadCommand.makeReadMessage(readCommand);
+ EndPoint[] endpoints =
MultiQuorumResponseHandler.this.endpoints_.get( readCommand.key() );
Message[][] messages = new Message[][]{ {messageRepair,
messageRepair, messageRepair} };
EndPoint[][] epList = new EndPoint[][]{ endpoints };
MessagingService.getMessagingInstance().sendRR(messages,
epList, MultiQuorumResponseHandler.this);
@@ -140,7 +140,7 @@
}
}
- public MultiQuorumResponseHandler(Map<String, ReadMessage> readMessages,
Map<String, EndPoint[]> endpoints)
+ public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages,
Map<String, EndPoint[]> endpoints)
{
condition_ = lock_.newCondition();
readMessages_ = readMessages;
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
Mon Apr 20 20:05:07 2009
@@ -30,16 +30,14 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponseMessage;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TouchMessage;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -145,13 +143,13 @@
}
}
- private static Map<String, Message> constructMessages(Map<String,
ReadMessage> readMessages) throws IOException
+ private static Map<String, Message> constructMessages(Map<String,
ReadCommand> readMessages) throws IOException
{
Map<String, Message> messages = new HashMap<String, Message>();
Set<String> keys = readMessages.keySet();
for ( String key : keys )
{
- Message message = ReadMessage.makeReadMessage(
readMessages.get(key) );
+ Message message = ReadCommand.makeReadMessage(
readMessages.get(key) );
messages.put(key, message);
}
return messages;
@@ -182,7 +180,7 @@
* @throws IOException
* @throws TimeoutException
*/
- public static Map<String, Row> doReadProtocol(Map<String, ReadMessage>
readMessages) throws IOException,TimeoutException
+ public static Map<String, Row> doReadProtocol(Map<String, ReadCommand>
readMessages) throws IOException,TimeoutException
{
Map<String, Row> rows = new HashMap<String, Row>();
Set<String> keys = readMessages.keySet();
@@ -206,14 +204,14 @@
return rows;
}
- public static Row doReadProtocol(String key, ReadMessage readMessage)
throws IOException,TimeoutException
+ public static Row doReadProtocol(String key, ReadCommand readCommand)
throws IOException,TimeoutException
{
Row row = null;
EndPoint endPoint =
StorageService.instance().findSuitableEndPoint(key);
if(endPoint != null)
{
- Message message = ReadMessage.makeReadMessage(readMessage);
- message.addHeader(ReadMessage.doRepair_,
ReadMessage.doRepair_.getBytes());
+ Message message = ReadCommand.makeReadMessage(readCommand);
+ message.addHeader(ReadCommand.doRepair_,
ReadCommand.doRepair_.getBytes());
IAsyncResult iar =
MessagingService.getMessagingInstance().sendRR(message, endPoint);
Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
byte[] body = (byte[])result[0];
@@ -310,9 +308,9 @@
}
if(!foundLocal && consistencyLevel ==
StorageService.ConsistencyLevel.WEAK)
{
- ReadMessage readMessage = null;
- readMessage = new ReadMessage(tablename, key, columnFamily,
columnNames);
- return doReadProtocol(key, readMessage);
+ ReadCommand readCommand = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily,
columnNames);
+ return doReadProtocol(key, readCommand);
}
else
{
@@ -349,9 +347,9 @@
}
if(!foundLocal && consistencyLevel ==
StorageService.ConsistencyLevel.WEAK)
{
- ReadMessage readMessage = null;
- readMessage = new ReadMessage(tablename, key, columnFamily, start,
count);
- return doReadProtocol(key, readMessage);
+ ReadCommand readCommand = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily, start,
count);
+ return doReadProtocol(key, readCommand);
}
else
{
@@ -408,9 +406,9 @@
}
if(!foundLocal && consistencyLevel ==
StorageService.ConsistencyLevel.WEAK)
{
- ReadMessage readMessage = null;
- readMessage = new ReadMessage(tablename, key, columnFamily,
sinceTimestamp);
- return doReadProtocol(key, readMessage);
+ ReadCommand readCommand = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily,
sinceTimestamp);
+ return doReadProtocol(key, readCommand);
}
else
{
@@ -436,12 +434,12 @@
{
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- ReadMessage readMessage = new ReadMessage(tablename, key,
columnFamily, columns);
+ ReadCommand readCommand = new ReadCommand(tablename, key,
columnFamily, columns);
- ReadMessage readMessageDigestOnly = new ReadMessage(tablename, key,
columnFamily, columns);
- readMessageDigestOnly.setIsDigestQuery(true);
+ ReadCommand readCommandDigestOnly = new ReadCommand(tablename, key,
columnFamily, columns);
+ readCommandDigestOnly.setIsDigestQuery(true);
- Row row = StorageProxy.doStrongReadProtocol(key, readMessage,
readMessageDigestOnly);
+ Row row = StorageProxy.doStrongReadProtocol(key, readCommand,
readCommandDigestOnly);
logger_.debug("readProtocol: " + (System.currentTimeMillis() -
startTime) + " ms.");
return row;
}
@@ -466,27 +464,27 @@
{
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- ReadMessage readMessage = null;
- ReadMessage readMessageDigestOnly = null;
+ ReadCommand readCommand = null;
+ ReadCommand readCommandDigestOnly = null;
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessage = new ReadMessage(tablename, key, columnFamily, start,
count);
+ readCommand = new ReadCommand(tablename, key, columnFamily, start,
count);
}
else
{
- readMessage = new ReadMessage(tablename, key, columnFamily);
+ readCommand = new ReadCommand(tablename, key, columnFamily);
}
- Message message = ReadMessage.makeReadMessage(readMessage);
+ Message message = ReadCommand.makeReadMessage(readCommand);
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessageDigestOnly = new ReadMessage(tablename, key,
columnFamily, start, count);
+ readCommandDigestOnly = new ReadCommand(tablename, key,
columnFamily, start, count);
}
else
{
- readMessageDigestOnly = new ReadMessage(tablename, key,
columnFamily);
+ readCommandDigestOnly = new ReadCommand(tablename, key,
columnFamily);
}
- readMessageDigestOnly.setIsDigestQuery(true);
- Row row = doStrongReadProtocol(key, readMessage,
readMessageDigestOnly);
+ readCommandDigestOnly.setIsDigestQuery(true);
+ Row row = doStrongReadProtocol(key, readCommand,
readCommandDigestOnly);
logger_.debug("readProtocol: " + (System.currentTimeMillis() -
startTime) + " ms.");
return row;
}
@@ -507,27 +505,27 @@
Map<String, Row> rows = new HashMap<String, Row>();
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- Map<String, ReadMessage[]> readMessages = new HashMap<String,
ReadMessage[]>();
+ Map<String, ReadCommand[]> readMessages = new HashMap<String,
ReadCommand[]>();
for (String key : keys )
{
- ReadMessage[] readMessage = new ReadMessage[2];
+ ReadCommand[] readCommand = new ReadCommand[2];
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessage[0] = new ReadMessage(tablename, key, columnFamily,
start, count);
+ readCommand[0] = new ReadCommand(tablename, key, columnFamily,
start, count);
}
else
{
- readMessage[0] = new ReadMessage(tablename, key, columnFamily);
+ readCommand[0] = new ReadCommand(tablename, key, columnFamily);
}
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessage[1] = new ReadMessage(tablename, key, columnFamily,
start, count);
+ readCommand[1] = new ReadCommand(tablename, key, columnFamily,
start, count);
}
else
{
- readMessage[1] = new ReadMessage(tablename, key, columnFamily);
+ readCommand[1] = new ReadCommand(tablename, key, columnFamily);
}
- readMessage[1].setIsDigestQuery(true);
+ readCommand[1].setIsDigestQuery(true);
}
rows = doStrongReadProtocol(readMessages);
logger_.debug("readProtocol: " + (System.currentTimeMillis() -
startTime) + " ms.");
@@ -538,13 +536,13 @@
{
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- ReadMessage readMessage = null;
- ReadMessage readMessageDigestOnly = null;
- readMessage = new ReadMessage(tablename, key, columnFamily,
sinceTimestamp);
- Message message = ReadMessage.makeReadMessage(readMessage);
- readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily,
sinceTimestamp);
- readMessageDigestOnly.setIsDigestQuery(true);
- Row row = doStrongReadProtocol(key, readMessage,
readMessageDigestOnly);
+ ReadCommand readCommand = null;
+ ReadCommand readCommandDigestOnly = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily,
sinceTimestamp);
+ Message message = ReadCommand.makeReadMessage(readCommand);
+ readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily,
sinceTimestamp);
+ readCommandDigestOnly.setIsDigestQuery(true);
+ Row row = doStrongReadProtocol(key, readCommand,
readCommandDigestOnly);
logger_.debug("readProtocol: " + (System.currentTimeMillis() -
startTime) + " ms.");
return row;
}
@@ -555,11 +553,11 @@
* param @ readMessage - the read message to get the actual data
* param @ readMessageDigest - the read message to get the digest.
*/
- private static Row doStrongReadProtocol(String key, ReadMessage
readMessage, ReadMessage readMessageDigest) throws IOException, TimeoutException
+ private static Row doStrongReadProtocol(String key, ReadCommand
readCommand, ReadCommand readCommandDigest) throws IOException, TimeoutException
{
Row row = null;
- Message message = ReadMessage.makeReadMessage(readMessage);
- Message messageDigestOnly =
ReadMessage.makeReadMessage(readMessageDigest);
+ Message message = ReadCommand.makeReadMessage(readCommand);
+ Message messageDigestOnly =
ReadCommand.makeReadMessage(readCommandDigest);
IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver();
QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(
@@ -607,9 +605,9 @@
QuorumResponseHandler<Row> quorumResponseHandlerRepair =
new QuorumResponseHandler<Row>(
DatabaseDescriptor.getReplicationFactor(),
readResponseResolverRepair);
- readMessage.setIsDigestQuery(false);
+ readCommand.setIsDigestQuery(false);
logger_.info("DigestMismatchException: " + key);
- Message messageRepair =
ReadMessage.makeReadMessage(readMessage);
+ Message messageRepair =
ReadCommand.makeReadMessage(readCommand);
MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
quorumResponseHandlerRepair);
try
{
@@ -628,7 +626,7 @@
return row;
}
- private static Map<String, Message[]> constructReplicaMessages(Map<String,
ReadMessage[]> readMessages) throws IOException
+ private static Map<String, Message[]> constructReplicaMessages(Map<String,
ReadCommand[]> readMessages) throws IOException
{
Map<String, Message[]> messages = new HashMap<String, Message[]>();
Set<String> keys = readMessages.keySet();
@@ -636,21 +634,21 @@
for ( String key : keys )
{
Message[] msg = new
Message[DatabaseDescriptor.getReplicationFactor()];
- ReadMessage[] readMessage = readMessages.get(key);
- msg[0] = ReadMessage.makeReadMessage( readMessage[0] );
+ ReadCommand[] readCommand = readMessages.get(key);
+ msg[0] = ReadCommand.makeReadMessage( readCommand[0] );
for ( int i = 1; i < msg.length; ++i )
{
- msg[i] = ReadMessage.makeReadMessage( readMessage[1] );
+ msg[i] = ReadCommand.makeReadMessage( readCommand[1] );
}
}
return messages;
}
- private static MultiQuorumResponseHandler dispatchMessages(Map<String,
ReadMessage[]> readMessages, Map<String, Message[]> messages) throws IOException
+ private static MultiQuorumResponseHandler dispatchMessages(Map<String,
ReadCommand[]> readMessages, Map<String, Message[]> messages) throws IOException
{
Set<String> keys = messages.keySet();
/* This maps the keys to the original data read messages */
- Map<String, ReadMessage> readMessage = new HashMap<String,
ReadMessage>();
+ Map<String, ReadCommand> readMessage = new HashMap<String,
ReadCommand>();
/* This maps the keys to their respective endpoints/replicas */
Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
/* Groups the messages that need to be sent to the individual keys */
@@ -700,7 +698,7 @@
* @return map containing key ---> Row
* @throws IOException, TimeoutException
*/
- private static Map<String, Row> doStrongReadProtocol(Map<String,
ReadMessage[]> readMessages) throws IOException
+ private static Map<String, Row> doStrongReadProtocol(Map<String,
ReadCommand[]> readMessages) throws IOException
{
Map<String, Row> rows = new HashMap<String, Row>();
/* Construct the messages to be sent to the replicas */
@@ -769,11 +767,11 @@
{
Row row = null;
long startTime = System.currentTimeMillis();
- Map<String, ReadMessage> readMessages = new HashMap<String,
ReadMessage>();
+ Map<String, ReadCommand> readMessages = new HashMap<String,
ReadCommand>();
for ( String key : keys )
{
- ReadMessage readMessage = new ReadMessage(tablename, key,
columnFamily, columns);
- readMessages.put(key, readMessage);
+ ReadCommand readCommand = new ReadCommand(tablename, key,
columnFamily, columns);
+ readMessages.put(key, readCommand);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
@@ -850,11 +848,11 @@
{
Row row = null;
long startTime = System.currentTimeMillis();
- Map<String, ReadMessage> readMessages = new HashMap<String,
ReadMessage>();
+ Map<String, ReadCommand> readMessages = new HashMap<String,
ReadCommand>();
for ( String key : keys )
{
- ReadMessage readMessage = new ReadMessage(tablename, key,
columnFamily, start, count);
- readMessages.put(key, readMessage);
+ ReadCommand readCommand = new ReadCommand(tablename, key,
columnFamily, start, count);
+ readMessages.put(key, readCommand);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
@@ -919,11 +917,11 @@
{
Row row = null;
long startTime = System.currentTimeMillis();
- Map<String, ReadMessage> readMessages = new HashMap<String,
ReadMessage>();
+ Map<String, ReadCommand> readMessages = new HashMap<String,
ReadCommand>();
for ( String key : keys )
{
- ReadMessage readMessage = new ReadMessage(tablename, key,
columnFamily, sinceTimestamp);
- readMessages.put(key, readMessage);
+ ReadCommand readCommand = new ReadCommand(tablename, key,
columnFamily, sinceTimestamp);
+ readMessages.put(key, readCommand);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
Mon Apr 20 20:05:07 2009
@@ -43,7 +43,7 @@
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponseMessage;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
@@ -880,10 +880,10 @@
key = user + ":1";
}
- ReadMessage readMessage = new ReadMessage(tablename_,
key);
+ ReadCommand readCommand = new ReadCommand(tablename_,
key);
Message message = new Message(from_,
StorageService.readStage_,
StorageService.readVerbHandler_,
- new Object[] { readMessage });
+ new Object[] {readCommand});
IAsyncResult iar =
MessagingService.getMessagingInstance().sendRR(
message, to_);
Object[] result = iar.get();
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java Mon
Apr 20 20:05:07 2009
@@ -19,10 +19,7 @@
package org.apache.cassandra.test;
import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
@@ -30,17 +27,13 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.analytics.AnalyticsContext;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowMutationMessage;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.Cassandra;
@@ -147,7 +140,7 @@
}
- public void readLoad(ReadMessage readMessage)
+ public void readLoad(ReadCommand readCommand)
{
IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver();
QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(
@@ -155,7 +148,7 @@
readResponseResolver);
Message message = new Message(from_, StorageService.readStage_,
StorageService.readVerbHandler_,
- new Object[] { readMessage });
+ new Object[] {readCommand});
MessagingService.getMessagingInstance().sendOneWay(message,
to_);
/*IAsyncResult iar =
MessagingService.getMessagingInstance().sendRR(message, to_);
try
@@ -187,7 +180,7 @@
String stringKey = new Integer(key).toString();
stringKey = stringKey + keyFix_ ;
int j = random.nextInt(columns) + 1;
- ReadMessage rm = new ReadMessage(tablename_, stringKey,
columnFamilyColumn_ + ":" + columnFix_ + j);
+ ReadCommand rm = new ReadCommand(tablename_, stringKey,
columnFamilyColumn_ + ":" + columnFix_ + j);
readLoad(rm);
if ( requestsPerSecond_ > 1000)
Thread.sleep(0,
1000000000/requestsPerSecond_);
@@ -257,7 +250,7 @@
stringKey = stringKey + keyFix_ ;
int i = random.nextInt(superColumns) + 1;
int j = random.nextInt(columns) + 1;
- ReadMessage rm = new ReadMessage(tablename_, stringKey,
columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+ ReadCommand rm = new ReadCommand(tablename_, stringKey,
columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
readLoad(rm);
}
}
Modified:
incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
(original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
Mon Apr 20 20:05:07 2009
@@ -16,16 +16,16 @@
colList.add("col1");
colList.add("col2");
- ReadMessage rm = new ReadMessage("Table1", "row1", "foo", colList);
- ReadMessage rm2 = serializeAndDeserializeReadMessage(rm);
+ ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
+ ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
}
- private ReadMessage serializeAndDeserializeReadMessage(ReadMessage rm)
+ private ReadCommand serializeAndDeserializeReadMessage(ReadCommand rm)
{
- ReadMessage rm2 = null;
- ReadMessageSerializer rms = (ReadMessageSerializer)
ReadMessage.serializer();
+ ReadCommand rm2 = null;
+ ReadCommandSerializer rms = (ReadCommandSerializer)
ReadCommand.serializer();
DataOutputBuffer dos = new DataOutputBuffer();
DataInputBuffer dis = new DataInputBuffer();