Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Thu Aug 27 20:07:12 2009 @@ -36,10 +36,12 @@ private AtomicBoolean done_ = new AtomicBoolean(false); private Lock lock_ = new ReentrantLock(); private Condition condition_; + private long startTime_; public AsyncResult() { condition_ = lock_.newCondition(); + startTime_ = System.currentTimeMillis(); } public byte[] get() @@ -77,8 +79,12 @@ try { if ( !done_.get() ) - { - bVal = condition_.await(timeout, tu); + { + long overall_timeout = System.currentTimeMillis() - startTime_ + timeout; + if(overall_timeout > 0) + bVal = condition_.await(overall_timeout, TimeUnit.MILLISECONDS); + else + bVal = false; } } catch ( InterruptedException ex ) @@ -97,17 +103,7 @@ } return result_; } - - public List<byte[]> multiget() - { - throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction."); - } - - public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException - { - throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction."); - } - + public void result(Message response) { try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Thu Aug 27 20:07:12 2009 @@ -45,22 +45,7 @@ * @return the result wrapped in an Object[] */ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException; - - /** - * Returns the result for all tasks that was submitted. - * @return the list of results wrapped in an Object[] - */ - public List<byte[]> multiget(); - - /** - * Same operation as the above get() but allows the calling - * thread to specify a timeout. - * @param timeout the maximum time to wait - * @param tu the time unit of the timeout argument - * @return the result wrapped in an Object[] - */ - public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException; - + /** * Store the result obtained for the submitted task. * @param result the response message Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java Thu Aug 27 20:07:12 2009 @@ -119,19 +119,6 @@ * array is sent to the ith element in the <code>to</code> array.This method assumes * there is a one-one mapping between the <code>messages</code> array and * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown. - * This method also informs the MessagingService to wait for at least - * <code>howManyResults</code> responses to determine success of failure. - * @param messages messages to be sent. - * @param to endpoints to which the message needs to be sent - * @return an reference to IAsyncResult - */ - public IAsyncResult sendRR(Message[] messages, EndPoint[] to); - - /** - * Send a message to a given endpoint. The ith element in the <code>messages</code> - * array is sent to the ith element in the <code>to</code> array.This method assumes - * there is a one-one mapping between the <code>messages</code> array and - * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown. * The idea is that multi-groups of messages are grouped as one logical message * whose results are harnessed via the <i>IAsyncResult</i> * @param messages groups of grouped messages. Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug 27 20:07:12 2009 @@ -379,25 +379,6 @@ return groupId; } - public IAsyncResult sendRR(Message[] messages, EndPoint[] to) - { - if ( messages.length != to.length ) - { - throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same."); - } - - IAsyncResult iar = new MultiAsyncResult(messages.length); - String groupId = GuidGenerator.guid(); - taskCompletionMap_.put(groupId, iar); - for ( int i = 0; i < messages.length; ++i ) - { - messages[i].setMessageId(groupId); - sendOneWay(messages[i], to[i]); - } - - return iar; - } - public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb) { if ( messages.length != to.length ) Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Thu Aug 27 20:07:12 2009 @@ -36,7 +36,6 @@ import org.apache.cassandra.net.EndPoint; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.LogUtil; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.thrift.TException; @@ -73,10 +72,17 @@ storageService.start(); } - protected ColumnFamily readColumnFamily(ReadCommand command, int consistency_level) throws InvalidRequestException + protected Map<String, ColumnFamily> readColumnFamily(List<ReadCommand> commands, int consistency_level) throws InvalidRequestException { - String cfName = command.getColumnFamilyName(); - ThriftValidation.validateKey(command.key); + // TODO - Support multiple column families per row, right now row only contains 1 column family + String cfName = commands.get(0).getColumnFamilyName(); + + Map<String, ColumnFamily> columnFamilyKeyMap = new HashMap<String,ColumnFamily>(); + + for (ReadCommand command: commands) + { + ThriftValidation.validateKey(command.key); + } if (consistency_level == ConsistencyLevel.ZERO) { @@ -87,10 +93,10 @@ throw new InvalidRequestException("Consistency level all is not yet supported on read operations"); } - Row row; + List<Row> rows; try { - row = StorageProxy.readProtocol(command, consistency_level); + rows = StorageProxy.readProtocol(commands, consistency_level); } catch (IOException e) { @@ -101,11 +107,11 @@ throw new RuntimeException(e); } - if (row == null) + for (Row row: rows) { - return null; + columnFamilyKeyMap.put(row.key(), row.getColumnFamily(cfName)); } - return row.getColumnFamily(cfName); + return columnFamilyKeyMap; } public List<Column> thriftifySubColumns(Collection<IColumn> columns) @@ -169,50 +175,78 @@ return thriftSuperColumns; } - private List<ColumnOrSuperColumn> getSlice(ReadCommand command, int consistency_level) throws InvalidRequestException + private Map<String, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, int consistency_level) throws InvalidRequestException { - ColumnFamily cfamily = readColumnFamily(command, consistency_level); - boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).reversed; - - if (cfamily == null || cfamily.getColumnsMap().size() == 0) - { - return EMPTY_COLUMNS; - } - if (command.queryPath.superColumnName != null) + Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, consistency_level); + Map<String, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<String, List<ColumnOrSuperColumn>>(); + for (ReadCommand command: commands) { - IColumn column = cfamily.getColumnsMap().values().iterator().next(); - Collection<IColumn> subcolumns = column.getSubColumns(); - if (subcolumns == null || subcolumns.isEmpty()) + ColumnFamily cfamily = cfamilies.get(command.key); + boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).reversed; + + if (cfamily == null || cfamily.getColumnsMap().size() == 0) { - return EMPTY_COLUMNS; + columnFamiliesMap.put(command.key, EMPTY_COLUMNS); + continue; } - return thriftifyColumns(subcolumns, reverseOrder); - } - if (cfamily.isSuper()) - { - return thriftifySuperColumns(cfamily.getSortedColumns(), reverseOrder); + if (command.queryPath.superColumnName != null) + { + IColumn column = cfamily.getColumnsMap().values().iterator().next(); + Collection<IColumn> subcolumns = column.getSubColumns(); + if (subcolumns == null || subcolumns.isEmpty()) + { + columnFamiliesMap.put(command.key, EMPTY_COLUMNS); + continue; + } + columnFamiliesMap.put(command.key, thriftifyColumns(subcolumns, reverseOrder)); + continue; + } + if (cfamily.isSuper()) + columnFamiliesMap.put(command.key, thriftifySuperColumns(cfamily.getSortedColumns(), reverseOrder)); + else + columnFamiliesMap.put(command.key, thriftifyColumns(cfamily.getSortedColumns(), reverseOrder)); } - return thriftifyColumns(cfamily.getSortedColumns(), reverseOrder); + + return columnFamiliesMap; } public List<ColumnOrSuperColumn> get_slice(String keyspace, String key, ColumnParent column_parent, SlicePredicate predicate, int consistency_level) throws InvalidRequestException, NotFoundException { if (logger.isDebugEnabled()) - logger.debug("get_slice_from"); + logger.debug("get_slice"); + return multigetSliceInternal(keyspace, Arrays.asList(key), column_parent, predicate, consistency_level).get(key); + } + + public Map<String, List<ColumnOrSuperColumn>> multiget_slice(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int consistency_level) + throws InvalidRequestException + { + if (logger.isDebugEnabled()) + logger.debug("multiget_slice"); + return multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level); + } + + private Map<String, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int consistency_level) + throws InvalidRequestException + { ThriftValidation.validateColumnParent(keyspace, column_parent); + List<ReadCommand> commands = new ArrayList<ReadCommand>(); + SliceRange range = predicate.slice_range; if (predicate.column_names != null) { + for (String key: keys) + commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names)); ThriftValidation.validateColumns(keyspace, column_parent, predicate.column_names); - return getSlice(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names), consistency_level); } else { - SliceRange range = predicate.slice_range; + for (String key: keys) + commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count)); ThriftValidation.validateRange(keyspace, column_parent, range); - return getSlice(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count), consistency_level); } + + return getSlice(commands, consistency_level); } public ColumnOrSuperColumn get(String table, String key, ColumnPath column_path, int consistency_level) @@ -220,50 +254,127 @@ { if (logger.isDebugEnabled()) logger.debug("get"); - ThriftValidation.validateColumnPath(table, column_path); - - QueryPath path = new QueryPath(column_path.column_family, column_path.super_column); - List<byte[]> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column); - ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(table, key, path, nameAsList), consistency_level); - if (cfamily == null) + ColumnOrSuperColumn column = multiget(table, Arrays.asList(key), column_path, consistency_level).get(key); + if (!column.isSetColumn() && !column.isSetSuper_column()) { throw new NotFoundException(); } - Collection<IColumn> columns = null; - if (column_path.super_column != null && column_path.column != null) + return column; + } + + /** no values will be mapped to keys with no data */ + private Map<String, Collection<IColumn>> multigetColumns(List<ReadCommand> commands, int consistency_level) + throws InvalidRequestException + { + Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, consistency_level); + Map<String, Collection<IColumn>> columnFamiliesMap = new HashMap<String, Collection<IColumn>>(); + + for (ReadCommand command: commands) { - IColumn column = cfamily.getColumn(column_path.super_column); - if (column != null) + ColumnFamily cfamily = cfamilies.get(command.key); + if (cfamily == null) + continue; + + Collection<IColumn> columns = null; + if (command.queryPath.superColumnName != null) { - columns = column.getSubColumns(); + IColumn column = cfamily.getColumn(command.queryPath.superColumnName); + if (column != null) + { + columns = column.getSubColumns(); + } + } + else + { + columns = cfamily.getSortedColumns(); + } + + if (columns != null && columns.size() != 0) + { + columnFamiliesMap.put(command.key, columns); } } - else - { - columns = cfamily.getSortedColumns(); - } - if (columns == null || columns.size() == 0) + return columnFamiliesMap; + } + + /** always returns a ColumnOrSuperColumn for each key, even if there is no data for it */ + public Map<String, ColumnOrSuperColumn> multiget(String table, List<String> keys, ColumnPath column_path, int consistency_level) + throws InvalidRequestException + { + if (logger.isDebugEnabled()) + logger.debug("multiget"); + return multigetInternal(table, keys, column_path, consistency_level); + } + + private Map<String, ColumnOrSuperColumn> multigetInternal(String table, List<String> keys, ColumnPath column_path, int consistency_level) + throws InvalidRequestException + { + ThriftValidation.validateColumnPath(table, column_path); + + QueryPath path = new QueryPath(column_path.column_family, column_path.super_column); + List<byte[]> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column); + List<ReadCommand> commands = new ArrayList<ReadCommand>(); + for (String key: keys) { - throw new NotFoundException(); + commands.add(new SliceByNamesReadCommand(table, key, path, nameAsList)); } - assert columns.size() == 1; - IColumn column = columns.iterator().next(); - if (column.isMarkedForDelete()) + Map<String, ColumnOrSuperColumn> columnFamiliesMap = new HashMap<String, ColumnOrSuperColumn>(); + Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, consistency_level); + + for (ReadCommand command: commands) { - throw new NotFoundException(); + ColumnOrSuperColumn columnorsupercolumn; + + Collection<IColumn> columns = columnsMap.get(command.key); + if (columns == null) + { + columnorsupercolumn = new ColumnOrSuperColumn(); + } + else + { + assert columns.size() == 1; + IColumn column = columns.iterator().next(); + + + if (column.isMarkedForDelete()) + { + columnorsupercolumn = new ColumnOrSuperColumn(); + } + else + { + columnorsupercolumn = column instanceof org.apache.cassandra.db.Column + ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null) + : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns()))); + } + + } + columnFamiliesMap.put(command.key, columnorsupercolumn); } - return column instanceof org.apache.cassandra.db.Column - ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null) - : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns()))); + return columnFamiliesMap; } public int get_count(String table, String key, ColumnParent column_parent, int consistency_level) throws InvalidRequestException { if (logger.isDebugEnabled()) - logger.debug("get_column_count"); + logger.debug("get_count"); + return multigetCountInternal(table, Arrays.asList(key), column_parent, consistency_level).get(key); + } + + public Map<String, Integer> multiget_count(String table, List<String> keys, ColumnParent column_parent, int consistency_level) + throws InvalidRequestException + { + if (logger.isDebugEnabled()) + logger.debug("multiget_count"); + return multigetCountInternal(table, keys, column_parent, consistency_level); + + } + + private Map<String, Integer> multigetCountInternal(String table, List<String> keys, ColumnParent column_parent, int consistency_level) + throws InvalidRequestException + { // validateColumnParent assumes we require simple columns; g_c_c is the only // one of the columnParent-taking apis that can also work at the SC level. // so we roll a one-off validator here. @@ -273,31 +384,29 @@ throw new InvalidRequestException("columnfamily alone is required for standard CF " + column_parent.column_family); } - ColumnFamily cfamily; - cfamily = readColumnFamily(new SliceFromReadCommand(table, key, column_parent, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE), consistency_level); - if (cfamily == null) + List<ReadCommand> commands = new ArrayList<ReadCommand>(); + for (String key: keys) { - return 0; + commands.add(new SliceFromReadCommand(table, key, column_parent, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE)); } - Collection<IColumn> columns = null; - if (column_parent.super_column != null) + + Map<String, Integer> columnFamiliesMap = new HashMap<String, Integer>(); + Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, consistency_level); + + for (ReadCommand command: commands) { - IColumn column = cfamily.getColumn(column_parent.super_column); - if (column != null) + Collection<IColumn> columns = columnsMap.get(command.key); + if(columns == null) { - columns = column.getSubColumns(); + columnFamiliesMap.put(command.key, 0); + } + else + { + columnFamiliesMap.put(command.key, columns.size()); } } - else - { - columns = cfamily.getSortedColumns(); - } - if (columns == null || columns.size() == 0) - { - return 0; - } - return columns.size(); - } + return columnFamiliesMap; + } public void insert(String table, String key, ColumnPath column_path, byte[] value, long timestamp, int consistency_level) throws InvalidRequestException, UnavailableException Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Aug 27 20:07:12 2009 @@ -41,6 +41,7 @@ private List<Message> responses_ = new ArrayList<Message>(); private IResponseResolver<T> responseResolver_; private AtomicBoolean done_ = new AtomicBoolean(false); + private long startTime_; public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException { @@ -51,6 +52,7 @@ condition_ = lock_.newCondition(); responseCount_ = responseCount; responseResolver_ = responseResolver; + startTime_ = System.currentTimeMillis(); } public T get() throws TimeoutException, DigestMismatchException @@ -62,8 +64,12 @@ try { if ( !done_.get() ) - { - bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + { + long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout(); + if(timeout > 0) + bVal = condition_.await(timeout, TimeUnit.MILLISECONDS); + else + bVal = false; } } catch ( InterruptedException ex ) Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Aug 27 20:07:12 2009 @@ -238,171 +238,101 @@ insertBlocking(rm, ConsistencyLevel.QUORUM); } - private static Map<String, Message> constructMessages(Map<String, ReadCommand> readMessages) throws IOException - { - Map<String, Message> messages = new HashMap<String, Message>(); - Set<String> keys = readMessages.keySet(); - for ( String key : keys ) - { - Message message = readMessages.get(key).makeReadMessage(); - messages.put(key, message); - } - return messages; - } - - private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Map<String, Message> messages) - { - Set<String> keys = endPoints.keySet(); - EndPoint[] eps = new EndPoint[keys.size()]; - Message[] msgs = new Message[keys.size()]; - - int i = 0; - for ( String key : keys ) - { - eps[i] = endPoints.get(key); - msgs[i] = messages.get(key); - ++i; - } - - IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(msgs, eps); - return iar; - } - - /** - * This is an implementation for the multiget version. - * @param readMessages map of key --> ReadMessage to be sent - * @return map of key --> Row - * @throws IOException - * @throws TimeoutException - */ - public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> readMessages) throws IOException,TimeoutException - { - Map<String, Row> rows = new HashMap<String, Row>(); - Set<String> keys = readMessages.keySet(); - /* Find all the suitable endpoints for the keys */ - Map<String, EndPoint> endPoints = StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] )); - /* Construct the messages to be sent out */ - Map<String, Message> messages = constructMessages(readMessages); - /* Dispatch the messages to the respective endpoints */ - IAsyncResult iar = dispatchMessages(endPoints, messages); - List<byte[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - - for ( byte[] body : results ) - { - DataInputBuffer bufIn = new DataInputBuffer(); - bufIn.reset(body, body.length); - ReadResponse response = ReadResponse.serializer().deserialize(bufIn); - Row row = response.row(); - rows.put(row.key(), row); - } - return rows; - } /** * Read the data from one replica. If there is no reply, read the data from another. In the event we get * the data we perform consistency checks and figure out if any repairs need to be done to the replicas. - * @param command the read to perform + * @param commands a set of commands to perform reads * @return the row associated with command.key * @throws Exception */ - private static Row weakReadRemote(ReadCommand command) throws IOException + private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException { - EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key); - assert endPoint != null; - Message message = command.makeReadMessage(); if (logger.isDebugEnabled()) - logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint); - message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes()); - IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint); - byte[] body; - try + logger.debug("weakreadlocal reading " + StringUtils.join(commands, ", ")); + + List<Row> rows = new ArrayList<Row>(); + List<IAsyncResult> iars = new ArrayList<IAsyncResult>(); + int commandIndex = 0; + + for (ReadCommand command: commands) { - body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key); + assert endPoint != null; + Message message = command.makeReadMessage(); + + if (logger.isDebugEnabled()) + logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint); + message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes()); + iars.add(MessagingService.getMessagingInstance().sendRR(message, endPoint)); } - catch (TimeoutException e) + + for (IAsyncResult iar: iars) { - throw new RuntimeException("error reading key " + command.key, e); - // TODO retry to a different endpoint? + byte[] body; + try + { + body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) + { + throw new RuntimeException("error reading key " + commands.get(commandIndex).key, e); + // TODO retry to a different endpoint? + } + DataInputBuffer bufIn = new DataInputBuffer(); + bufIn.reset(body, body.length); + ReadResponse response = ReadResponse.serializer().deserialize(bufIn); + if (response.row() != null) + rows.add(response.row()); + commandIndex++; } - DataInputBuffer bufIn = new DataInputBuffer(); - bufIn.reset(body, body.length); - ReadResponse response = ReadResponse.serializer().deserialize(bufIn); - return response.row(); + return rows; } /** * Performs the actual reading of a row out of the StorageService, fetching * a specific set of column names from a given column family. */ - public static Row readProtocol(ReadCommand command, int consistency_level) + public static List<Row> readProtocol(List<ReadCommand> commands, int consistency_level) throws IOException, TimeoutException, InvalidRequestException { long startTime = System.currentTimeMillis(); - Row row; - EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key); + List<Row> rows = new ArrayList<Row>(); if (consistency_level == ConsistencyLevel.ONE) { - boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint()); - //TODO: Throw InvalidRequest if we're in bootstrap mode? - if (foundLocal && !StorageService.instance().isBootstrapMode()) - { - row = weakReadLocal(command); - } - else + List<ReadCommand> localCommands = new ArrayList<ReadCommand>(); + List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>(); + + for (ReadCommand command: commands) { - row = weakReadRemote(command); + EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key); + boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint()); + //TODO: Throw InvalidRequest if we're in bootstrap mode? + if (foundLocal && !StorageService.instance().isBootstrapMode()) + { + localCommands.add(command); + } + else + { + remoteCommands.add(command); + } } + if (localCommands.size() > 0) + rows.addAll(weakReadLocal(localCommands)); + + if (remoteCommands.size() > 0) + rows.addAll(weakReadRemote(remoteCommands)); } else { assert consistency_level == ConsistencyLevel.QUORUM; - row = strongRead(command); + rows = strongRead(commands); } readStats.add(System.currentTimeMillis() - startTime); - return row; - } - - public static Map<String, Row> readProtocol(String[] keys, ReadCommand readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception - { - Map<String, Row> rows = new HashMap<String, Row>(); - switch ( consistencyLevel ) - { - case WEAK: - rows = weakReadProtocol(keys, readCommand); - break; - - case STRONG: - rows = strongReadProtocol(keys, readCommand); - break; - - default: - rows = weakReadProtocol(keys, readCommand); - break; - } - return rows; - } - - /** - * This is a multiget version of the above method. - */ - public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException - { - Map<String, Row> rows; - // TODO: throw a thrift exception if we do not have N nodes - Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>(); - for (String key : keys ) - { - ReadCommand[] readParameters = new ReadCommand[2]; - readParameters[0] = readCommand.copy(); - readParameters[1] = readCommand.copy(); - readParameters[1].setDigestQuery(true); - readMessages.put(key, readParameters); - } - rows = doStrongReadProtocol(readMessages); return rows; } @@ -418,80 +348,100 @@ * 7. else carry out read repair by getting data from all the nodes. // 5. return success */ - private static Row strongRead(ReadCommand command) throws IOException, TimeoutException, InvalidRequestException + private static List<Row> strongRead(List<ReadCommand> commands) throws IOException, TimeoutException, InvalidRequestException { - // TODO: throw a thrift exception if we do not have N nodes - assert !command.isDigestQuery(); - ReadCommand readMessageDigestOnly = command.copy(); - readMessageDigestOnly.setDigestQuery(true); - - Row row = null; - Message message = command.makeReadMessage(); - Message messageDigestOnly = readMessageDigestOnly.makeReadMessage(); - - IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(); - QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>( - DatabaseDescriptor.getQuorum(), - readResponseResolver); - EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key); - List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key))); - /* Remove the local storage endpoint from the list. */ - endpointList.remove(dataPoint); - EndPoint[] endPoints = new EndPoint[endpointList.size() + 1]; - Message messages[] = new Message[endpointList.size() + 1]; - - /* - * First message is sent to the node that will actually get - * the data for us. The other two replicas are only sent a - * digest query. - */ - endPoints[0] = dataPoint; - messages[0] = message; - if (logger.isDebugEnabled()) - logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint); - for (int i = 1; i < endPoints.length; i++) - { - EndPoint digestPoint = endpointList.get(i - 1); - endPoints[i] = digestPoint; - messages[i] = messageDigestOnly; + List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>(); + List<EndPoint[]> commandEndPoints = new ArrayList<EndPoint[]>(); + List<Row> rows = new ArrayList<Row>(); + + int commandIndex = 0; + + for (ReadCommand command: commands) + { + // TODO: throw a thrift exception if we do not have N nodes + assert !command.isDigestQuery(); + ReadCommand readMessageDigestOnly = command.copy(); + readMessageDigestOnly.setDigestQuery(true); + + Message message = command.makeReadMessage(); + Message messageDigestOnly = readMessageDigestOnly.makeReadMessage(); + + IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(); + QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>( + DatabaseDescriptor.getQuorum(), + readResponseResolver); + EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key); + List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key))); + /* Remove the local storage endpoint from the list. */ + endpointList.remove(dataPoint); + EndPoint[] endPoints = new EndPoint[endpointList.size() + 1]; + Message messages[] = new Message[endpointList.size() + 1]; + + /* + * First message is sent to the node that will actually get + * the data for us. The other two replicas are only sent a + * digest query. + */ + endPoints[0] = dataPoint; + messages[0] = message; if (logger.isDebugEnabled()) - logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint); + logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint); + for (int i = 1; i < endPoints.length; i++) + { + EndPoint digestPoint = endpointList.get(i - 1); + endPoints[i] = digestPoint; + messages[i] = messageDigestOnly; + if (logger.isDebugEnabled()) + logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint); + } + MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler); + quorumResponseHandlers.add(quorumResponseHandler); + commandEndPoints.add(endPoints); } - try + for (QuorumResponseHandler<Row> quorumResponseHandler: quorumResponseHandlers) { - MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler); + Row row = null; + ReadCommand command = commands.get(commandIndex); + try + { + long startTime2 = System.currentTimeMillis(); + row = quorumResponseHandler.get(); + if (row != null) + rows.add(row); - long startTime2 = System.currentTimeMillis(); - row = quorumResponseHandler.get(); - if (logger.isDebugEnabled()) - logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms."); - } - catch (DigestMismatchException ex) - { - if ( DatabaseDescriptor.getConsistencyCheck()) + if (logger.isDebugEnabled()) + logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms."); + } + catch (DigestMismatchException ex) { - IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(); - QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>( - DatabaseDescriptor.getQuorum(), - readResponseResolverRepair); - logger.info("DigestMismatchException: " + command.key); - Message messageRepair = command.makeReadMessage(); - MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, - quorumResponseHandlerRepair); - try - { - row = quorumResponseHandlerRepair.get(); - } - catch (DigestMismatchException e) + if ( DatabaseDescriptor.getConsistencyCheck()) { - // TODO should this be a thrift exception? - throw new RuntimeException("digest mismatch reading key " + command.key, e); + IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(); + QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>( + DatabaseDescriptor.getQuorum(), + readResponseResolverRepair); + logger.info("DigestMismatchException: " + command.key); + Message messageRepair = command.makeReadMessage(); + MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex), + quorumResponseHandlerRepair); + try + { + row = quorumResponseHandlerRepair.get(); + if (row != null) + rows.add(row); + } + catch (DigestMismatchException e) + { + // TODO should this be a thrift exception? + throw new RuntimeException("digest mismatch reading key " + command.key, e); + } } } + commandIndex++; } - return row; + return rows; } private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException @@ -557,69 +507,6 @@ MessagingService.getMessagingInstance().sendRR(msgList, epList, quorumResponseHandlers); return quorumResponseHandlers; } - - /** - * This method performs the read from the replicas for a bunch of keys. - * @param readMessages map of key --> readMessage[] of two entries where - * the first entry is the readMessage for the data and the second - * is the entry for the digest - * @return map containing key ---> Row - * @throws IOException, TimeoutException - */ - private static Map<String, Row> doStrongReadProtocol(Map<String, ReadCommand[]> readMessages) throws IOException - { - Map<String, Row> rows = new HashMap<String, Row>(); - /* Construct the messages to be sent to the replicas */ - Map<String, Message[]> replicaMessages = constructReplicaMessages(readMessages); - /* Dispatch the messages to the different replicas */ - MultiQuorumResponseHandler cb = dispatchMessagesMulti(readMessages, replicaMessages); - try - { - Row[] rows2 = cb.get(); - for ( Row row : rows2 ) - { - rows.put(row.key(), row); - } - } - catch (TimeoutException e) - { - throw new RuntimeException("timeout reading keys " + StringUtils.join(rows.keySet(), ", "), e); - } - return rows; - } - - /** - * This version is used when results for multiple keys needs to be - * retrieved. - * - * @return a mapping of key --> Row - * @throws Exception - */ - public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception - { - Row row = null; - Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>(); - for ( String key : keys ) - { - ReadCommand readCmd = readCommand.copy(); - readMessages.put(key, readCmd); - } - /* Performs the multiget in parallel */ - Map<String, Row> rows = doReadProtocol(readMessages); - /* - * Do the consistency checks for the keys that are being queried - * in the background. - */ - for ( String key : keys ) - { - List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key); - /* Remove the local storage endpoint from the list. */ - endpoints.remove( StorageService.getLocalStorageEndPoint() ); - if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck()) - StorageService.instance().doConsistencyCheck(row, endpoints, readMessages.get(key)); - } - return rows; - } /* * This function executes the read protocol locally and should be used only if consistency is not a concern. @@ -627,25 +514,33 @@ * one of the other replicas (in the same data center if possible) till we get the data. In the event we get * the data we perform consistency checks and figure out if any repairs need to be done to the replicas. */ - private static Row weakReadLocal(ReadCommand command) throws IOException + private static List<Row> weakReadLocal(List<ReadCommand> commands) throws IOException { - if (logger.isDebugEnabled()) - logger.debug("weakreadlocal reading " + command); - List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key); - /* Remove the local storage endpoint from the list. */ - endpoints.remove(StorageService.getLocalStorageEndPoint()); - // TODO: throw a thrift exception if we do not have N nodes - - Table table = Table.open(command.table); - Row row = command.getRow(table); - - /* - * Do the consistency checks in the background and return the - * non NULL row. - */ - if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck()) - StorageService.instance().doConsistencyCheck(row, endpoints, command); - return row; + List<Row> rows = new ArrayList<Row>(); + for (ReadCommand command: commands) + { + List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key); + /* Remove the local storage endpoint from the list. */ + endpoints.remove(StorageService.getLocalStorageEndPoint()); + // TODO: throw a thrift exception if we do not have N nodes + + if (logger.isDebugEnabled()) + logger.debug("weakreadlocal reading " + command); + + Table table = Table.open(command.table); + Row row = command.getRow(table); + if (row != null) + rows.add(row); + /* + * Do the consistency checks in the background and return the + * non NULL row. + */ + if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck()) + StorageService.instance().doConsistencyCheck(row, endpoints, command); + + } + + return rows; } static List<String> getKeyRange(RangeCommand rawCommand) throws IOException Modified: incubator/cassandra/trunk/test/system/test_server.py URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=808589&r1=808588&r2=808589&view=diff ============================================================================== --- incubator/cassandra/trunk/test/system/test_server.py (original) +++ incubator/cassandra/trunk/test/system/test_server.py Thu Aug 27 20:07:12 2009 @@ -35,14 +35,22 @@ Column(_i64(6), 'value6', 0)])] def _insert_simple(block=True): + return _insert_multi(['key1'], block) + +def _insert_batch(block): + return _insert_multi_batch(['key1'], block) + +def _insert_multi(keys, block=True): if block: consistencyLevel = ConsistencyLevel.ONE else: consistencyLevel = ConsistencyLevel.ZERO - client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c1'), 'value1', 0, consistencyLevel) - client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c2'), 'value2', 0, consistencyLevel) -def _insert_batch(block): + for key in keys: + client.insert('Keyspace1', key, ColumnPath('Standard1', column='c1'), 'value1', 0, consistencyLevel) + client.insert('Keyspace1', key, ColumnPath('Standard1', column='c2'), 'value2', 0, consistencyLevel) + +def _insert_multi_batch(keys, block): cfmap = {'Standard1': [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS], 'Standard2': [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]} if block: @@ -50,12 +58,17 @@ else: consistencyLevel = ConsistencyLevel.ZERO - client.batch_insert('Keyspace1', BatchMutation(key='key1', cfmap=cfmap), consistencyLevel) + for key in keys: + client.batch_insert('Keyspace1', BatchMutation(key=key, cfmap=cfmap), consistencyLevel) def _big_slice(keyspace, key, column_parent): p = SlicePredicate(slice_range=SliceRange('', '', False, 1000)) return client.get_slice(keyspace, key, column_parent, p, ConsistencyLevel.ONE) +def _big_multislice(keyspace, keys, column_parent): + p = SlicePredicate(slice_range=SliceRange('', '', False, 1000)) + return client.multiget_slice(keyspace, keys, column_parent, p, ConsistencyLevel.ONE) + def _verify_batch(): _verify_simple() L = [result.column @@ -492,3 +505,59 @@ result = client.get_slice('Keyspace1', 'key1', ColumnParent('Super1', 'sc1'), p, ConsistencyLevel.ONE) assert len(result) == 1 assert result[0].column.name == _i64(4) + + def test_multiget(self): + """Insert multiple keys and retrieve them using the multiget interface""" + + """Generate a list of 10 keys and insert them""" + num_keys = 10 + keys = ['key'+str(i) for i in range(1, num_keys+1)] + _insert_multi(keys) + + """Retrieve all 10 keys""" + rows = client.multiget('Keyspace1', keys, ColumnPath('Standard1', column='c1'), ConsistencyLevel.ONE) + keys1 = rows.keys().sort() + keys2 = keys.sort() + + """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted""" + for key in keys: + assert rows.has_key(key) == True + assert rows[key] == ColumnOrSuperColumn(column=Column(timestamp=0, name='c1', value='value1')) + + def test_multiget_slice(self): + """Insert multiple keys and retrieve them using the multiget_slice interface""" + + """Generate a list of 10 keys and insert them""" + num_keys = 10 + keys = ['key'+str(i) for i in range(1, num_keys+1)] + _insert_multi(keys) + + """Retrieve all 10 key slices""" + rows = _big_multislice('Keyspace1', keys, ColumnParent('Standard1')) + keys1 = rows.keys().sort() + keys2 = keys.sort() + + columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS] + """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted""" + for key in keys: + assert rows.has_key(key) == True + assert columns == rows[key] + + def test_multiget_count(self): + """Insert multiple keys and retrieve them using the multiget_count interface""" + + """Generate a list of 10 keys and insert them""" + num_keys = 10 + keys = ['key'+str(i) for i in range(1, num_keys+1)] + _insert_multi(keys) + + """Retrieve all 10 key slices""" + rows = client.multiget_count('Keyspace1', keys, ColumnParent('Standard1'), ConsistencyLevel.ONE) + keys1 = rows.keys().sort() + keys2 = keys.sort() + + columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS] + """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted""" + for key in keys: + assert rows.has_key(key) == True + assert rows[key] == 2 \ No newline at end of file
